Skip to content

Commit

Permalink
first pass at slimming down the worker and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
♠ ace hacker committed Dec 24, 2012
1 parent adcd426 commit 7682ccf
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 654 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
etc/
*.gem
.env
Gemfile.lock
Gemfile.lock
tips-and-tricks.md
1 change: 0 additions & 1 deletion .gitiosk

This file was deleted.

8 changes: 0 additions & 8 deletions .travis.yml

This file was deleted.

11 changes: 0 additions & 11 deletions MIT-LICENSE

This file was deleted.

38 changes: 0 additions & 38 deletions benchmark.rb

This file was deleted.

6 changes: 6 additions & 0 deletions changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Version 2.1.0
- wrap connection execution in mutex making it thread safe
- cleanup logging
- refactor worker class making it more extensible
- added rdoc style docs for worker class

Version 2.0.5
- allow term signal to halt the lock_job function

Expand Down
54 changes: 29 additions & 25 deletions lib/queue_classic/conn.rb
Original file line number Diff line number Diff line change
@@ -1,46 +1,51 @@
require 'thread'

module QC
module Conn
extend self
@exec_mutex = Mutex.new

def execute(stmt, *params)
log(:level => :debug, :action => "exec_sql", :sql => stmt.inspect)
begin
params = nil if params.empty?
r = connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
rescue PGError => e
log(:error => e.inspect)
disconnect
raise
@exec_mutex.synchronize do
log(:at => "exec_sql", :sql => stmt.inspect)
begin
params = nil if params.empty?
r = connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
rescue PGError => e
log(:error => e.inspect)
disconnect
raise
end
end
end

def notify(chan)
log(:level => :debug, :action => "NOTIFY")
log(:at => "NOTIFY")
execute('NOTIFY "' + chan + '"') #quotes matter
end

def listen(chan)
log(:level => :debug, :action => "LISTEN")
log(:at => "LISTEN")
execute('LISTEN "' + chan + '"') #quotes matter
end

def unlisten(chan)
log(:level => :debug, :action => "UNLISTEN")
log(:at => "UNLISTEN")
execute('UNLISTEN "' + chan + '"') #quotes matter
end

def drain_notify
until connection.notifies.nil?
log(:level => :debug, :action => "drain_notifications")
log(:at => "drain_notifications")
end
end

def wait_for_notify(t)
connection.wait_for_notify(t) do |event, pid, msg|
log(:level => :debug, :action => "received_notification")
log(:at => "received_notification")
end
end

Expand All @@ -65,22 +70,21 @@ def connection

def connection=(connection)
unless connection.instance_of? PG::Connection
raise(
ArgumentError,
"connection must be an instance of PG::Connection, but was #{connection.class}"
)
c = connection.class
err = "connection must be an instance of PG::Connection, but was #{c}"
raise(ArgumentError, err)
end
@connection = connection
end

def disconnect
connection.finish
ensure
@connection = nil
begin connection.finish
ensure @connection = nil
end
end

def connect
log(:level => :debug, :action => "establish_conn")
log(:at => "establish_conn")
conn = PGconn.connect(
db_url.host,
db_url.port || 5432,
Expand All @@ -90,7 +94,7 @@ def connect
db_url.password
)
if conn.status != PGconn::CONNECTION_OK
log(:level => :error, :message => conn.error)
log(:error => conn.error)
end
conn
end
Expand Down
2 changes: 1 addition & 1 deletion lib/queue_classic/queries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Queries

def insert(q_name, method, args, chan=nil)
QC.log_yield(:action => "insert_job") do
s = "INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
s="INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
res = Conn.execute(s, q_name, method, OkJson.encode(args))
Conn.notify(chan) if chan
end
Expand Down
1 change: 0 additions & 1 deletion lib/queue_classic/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module QC
class Queue

attr_reader :name, :chan

def initialize(name, notify=QC::LISTENING_WORKER)
@name = name
@chan = @name if notify
Expand Down
5 changes: 4 additions & 1 deletion lib/queue_classic/tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
namespace :qc do
desc "Start a new worker for the (default or $QUEUE) queue"
task :work => :environment do
QC::Worker.new.start
trap('INT') {exit}
trap('TERM') {@worker.stop}
@worker = QC::Worker.new
@worker.start
end

desc "Returns the number of jobs in the (default or QUEUE) queue"
Expand Down
Loading

0 comments on commit 7682ccf

Please sign in to comment.