Skip to content

Commit

Permalink
Merge pull request #175 from ryandotsmith/no-pool-fork-concurr
Browse files Browse the repository at this point in the history
Reduction: Remove conn pool
  • Loading branch information
ryandotsmith committed Sep 5, 2013
2 parents 227e75f + 2b31349 commit ab0785d
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 211 deletions.
6 changes: 4 additions & 2 deletions lib/generators/queue_classic/install_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ class InstallGenerator < Rails::Generators::Base

namespace "queue_classic:install"
self.source_paths << File.join(File.dirname(__FILE__), 'templates')
desc 'Generates (but does not run) a migration to add a queue_classic table.'
desc 'Generates (but does not run) a migration to add ' +
'a queue_classic table.'

def self.next_migration_number(dirname)
next_migration_number = current_migration_number(dirname) + 1
ActiveRecord::Migration.next_migration_number(next_migration_number)
end

def create_migration_file
migration_template 'add_queue_classic.rb', 'db/migrate/add_queue_classic.rb'
migration_template 'add_queue_classic.rb',
'db/migrate/add_queue_classic.rb'
end
end
end
31 changes: 0 additions & 31 deletions lib/queue_classic.rb
Original file line number Diff line number Diff line change
@@ -1,35 +1,4 @@
module QC
# You can use the APP_NAME to query for
# postgres related process information in the
# pg_stat_activity table.
APP_NAME = ENV["QC_APP_NAME"] || "queue_classic"

# Number of seconds to block on the listen chanel for new jobs.
WAIT_TIME = (ENV["QC_LISTEN_TIME"] || 5).to_i

# Why do you want to change the table name?
# Just deal with the default OK?
# If you do want to change this, you will
# need to update the PL/pgSQL lock_head() function.
# Come on. Don't do it.... Just stick with the default.
TABLE_NAME = "queue_classic_jobs"

# Each row in the table will have a column that
# notes the queue. You can point your workers
# at different queues but only one at a time.
QUEUE = ENV["QUEUE"] || "default"

# Set this to 1 for strict FIFO.
# There is nothing special about 9....
TOP_BOUND = (ENV["QC_TOP_BOUND"] || 9).to_i

# Set this variable if you wish for
# the worker to fork a UNIX process for
# each locked job. Remember to re-establish
# any database connections. See the worker
# for more details.
FORK_WORKER = !ENV["QC_FORK_WORKER"].nil?

# Defer method calls on the QC module to the
# default queue. This facilitates QC.enqueue()
def self.method_missing(sym, *args, &block)
Expand Down
11 changes: 9 additions & 2 deletions lib/queue_classic/conn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@

module QC
class Conn
# Number of seconds to block on the listen chanel for new jobs.
WAIT_TIME = (ENV["QC_LISTEN_TIME"] || 5).to_i
# You can use the APP_NAME to query for
# postgres related process information in the
# pg_stat_activity table.
APP_NAME = ENV["QC_APP_NAME"] || "queue_classic"


def self.connect
QC.log(:at => "establish_conn")
Expand All @@ -15,7 +22,7 @@ def self.connect
if !Conf.debug?
conn.exec("SET client_min_messages TO 'warning'")
end
conn.exec("SET application_name = '#{QC::APP_NAME}'")
conn.exec("SET application_name = '#{APP_NAME}'")
conn
end

Expand Down Expand Up @@ -51,7 +58,7 @@ def reconnect
end

def disconnect
begin @c.finish
begin @c && @c.finish
ensure @c = nil
end
end
Expand Down
71 changes: 0 additions & 71 deletions lib/queue_classic/pool.rb

This file was deleted.

29 changes: 18 additions & 11 deletions lib/queue_classic/queue.rb
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
require 'queue_classic'
require 'queue_classic/pool'
require 'queue_classic/conn'
require 'json'

module QC
class Queue
TABLE_NAME = "queue_classic_jobs"
# Each row in the table will have a column that
# notes the queue.
QUEUE_NAME = ENV["QUEUE"] || "default"
# Set this to 1 for strict FIFO.
TOP_BOUND = (ENV["QC_TOP_BOUND"] || 9).to_i

attr_reader :pool, :name, :top_bound

attr_reader :conn, :name, :top_bound
def initialize(opts={})
@pool = opts[:pool] || Pool.new
@name = opts[:name] || QC::QUEUE
@top_bound = opts[:top_bound] || QC::TOP_BOUND
@conn = opts[:conn] || Conn.new
@name = opts[:name] || QUEUE_NAME
@top_bound = opts[:top_bound] || TOP_BOUND
end

def enqueue(method, *args)
QC.log_yield(:measure => 'queue.enqueue') do
s="INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
res = @pool.use {|c| c.execute(s, name, method, JSON.dump(args))}
res = conn.execute(s, name, method, JSON.dump(args))
end
end

def lock
QC.log_yield(:measure => 'queue.lock') do
s = "SELECT * FROM lock_head($1, $2)"
if r = @pool.use {|c| c.execute(s, name, top_bound)}
if r = conn.execute(s, name, top_bound)
{:id => r["id"],
:method => r["method"],
:args => JSON.parse(r["args"])}
Expand All @@ -32,28 +39,28 @@ def lock

def wait
QC.log_yield(:measure => 'queue.wait') do
@pool.use {|c| c.wait(name)}
conn.wait(name)
end
end

def delete(id)
QC.log_yield(:measure => 'queue.delete') do
s = "DELETE FROM #{TABLE_NAME} where id = $1"
@pool.use {|c| c.execute(s, id)}
conn.execute(s, id)
end
end

def delete_all
QC.log_yield(:measure => 'queue.delete_all') do
s = "DELETE FROM #{TABLE_NAME} WHERE q_name = $1"
@pool.use {|c| c.execute(s, name)}
conn.execute(s, name)
end
end

def count
QC.log_yield(:measure => 'queue.count') do
s = "SELECT COUNT(*) FROM #{TABLE_NAME} WHERE q_name = $1"
r = @pool.use {|c| c.execute(s, name)}
r = conn.execute(s, name)
r["count"].to_i
end
end
Expand Down
22 changes: 9 additions & 13 deletions lib/queue_classic/setup.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'queue_classic/pool'
require 'queue_classic/conn'

module QC
module Setup
Expand All @@ -7,20 +7,16 @@ module Setup
CreateTable = File.join(Root, "/sql/create_table.sql")
DropSqlFunctions = File.join(Root, "/sql/drop_ddl.sql")

def self.create(pool=nil)
pool ||= Pool.new
pool.use do |c|
c.execute(File.read(CreateTable))
c.execute(File.read(SqlFunctions))
end
def self.create(conn=nil)
conn ||= Conn.new
conn.execute(File.read(CreateTable))
conn.execute(File.read(SqlFunctions))
end

def self.drop(pool=nil)
pool ||= Pool.new
pool.use do |c|
c.execute("DROP TABLE IF EXISTS queue_classic_jobs CASCADE")
c.execute(File.read(DropSqlFunctions))
end
def self.drop(conn=nil)
conn ||= Conn.new
conn.execute("DROP TABLE IF EXISTS queue_classic_jobs CASCADE")
conn.execute(File.read(DropSqlFunctions))
end
end
end
24 changes: 14 additions & 10 deletions lib/queue_classic/worker.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
require 'thread'
require 'queue_classic'
require 'queue_classic/queue'
require 'queue_classic/conn'

module QC
class Worker
# Set this variable if you wish for
# the worker to fork a UNIX process for
# each locked job. Remember to re-establish
# any database connections. See the worker
# for more details.
FORK_WORKER = !ENV["QC_FORK_WORKER"].nil?
# The worker is capable of processing many jobs at a time.
# It uses FORK(2) to accomplish parallel processing. CONCURRENCY
# is used to set an uppoer bound on how many worker processes can
# run concurrently.
CONCURRENCY = Integer(ENV["QC_CONCURRENCY"] || 1)

attr_accessor :queue, :running
# In the case no arguments are passed to the initializer,
# the defaults are pulled from the environment variables.
def initialize(args={})
@fork_worker = args[:fork_worker] || QC::FORK_WORKER
@limiter = SizedQueue.new(args[:concurrency] || 1)
name = args[:q_name] || QC::QUEUE
@pool = args[:pool] || Pool.new(Integer(args[:max_conns] || 1))
@queue = QC::Queue.new(
:pool => @pool,
:name => name,
:top_bound => args[:top_bound]
)
@fork_worker = args[:fork_worker] || FORK_WORKER || (CONCURRENCY > 1)
@limiter = SizedQueue.new(args[:concurrency] || CONCURRENCY)
@queue = args[:queue] || QC.default_queue
log(args.merge(:at => "worker_initialized"))
@running = true
end
Expand Down
16 changes: 5 additions & 11 deletions test/benchmark_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,25 @@ class BenchmarkTest < QCTest
def test_enqueue
n = 10_000
start = Time.now
n.times do
QC.enqueue("1.odd?", [])
end
n.times {QC.enqueue("1.odd?")}
assert_equal(n, QC.count)

elapsed = Time.now - start
assert_in_delta(4, elapsed, 1)
end

def test_dequeue
pool = QC::Pool.new
queue = QC::Queue.new(:pool => pool)
queue = QC::Queue.new
worker = QC::Worker.new(:concurrency => 4, :queue => queue)
worker.running = true
queue.delete_all
n = 20
n.times do
queue.enqueue("puts", "hello")
end

n.times {queue.enqueue("puts", "hello")}
assert_equal(n, queue.count)

start = Time.now
n.times.map {worker.fork_and_work}.map(&:join)
elapsed = Time.now - start

elapsed = Time.now - start
assert_equal(0, queue.count)
assert_in_delta(10, elapsed, 3)
end
Expand Down
10 changes: 5 additions & 5 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
class QCTest < Minitest::Test

def init_db
p = QC::Pool.new
QC::Setup.drop(p)
QC::Setup.create(p)
p.use {|c| c.execute(File.read('./test/helper.sql'))}
p.drain!
c = QC::Conn.new
QC::Setup.drop(c)
QC::Setup.create(c)
c.execute(File.read('./test/helper.sql'))
c.disconnect
end

def capture_debug_output
Expand Down
18 changes: 0 additions & 18 deletions test/pool_test.rb

This file was deleted.

Loading

0 comments on commit ab0785d

Please sign in to comment.