Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruby's thread-safe queue #3

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 13 additions & 23 deletions lib/redis_pool/connection_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@ class ConnectionQueue
def initialize(max_size = 0, &block)
@create_block = block
@created = 0
@queue = []
@queue = Queue.new
@max_size = max_size
@lock = Monitor.new
@lock_cond = @lock.new_cond
end

##
# Adds (or returns) a connection to the available queue, synchronously.
#
def add(element)
synchronize do
@queue.push element
@lock_cond.signal
end
@queue.push element
end
alias << add
alias push add
Expand All @@ -40,18 +35,14 @@ def add(element)
def poll(timeout = 5)
t0 = Concurrent.monotonic_time
elapsed = 0
synchronize do
loop do
return get_connection if connection_available?

connection = create_connection
return connection if connection
loop do
return get_connection if connection_available?

elapsed = Concurrent.monotonic_time - t0
raise TimeoutError, 'could not obtain connection' if elapsed >= timeout
connection = create_connection
return connection if connection

@lock_cond.wait(timeout - elapsed)
end
elapsed = Concurrent.monotonic_time - t0
raise TimeoutError, 'could not obtain connection' if elapsed >= timeout
end
end
alias pop poll
Expand All @@ -61,9 +52,12 @@ def poll(timeout = 5)
# synchronously.
#
def delete(element)
synchronize do
@queue.delete element
new_queue = Queue.new
while [email protected]
current = @queue.pop(non_block=true)
new_queue << current if current != element
end
@queue = new_queue
end

##
Expand All @@ -85,10 +79,6 @@ def available_to_create

private

def synchronize(&block)
@lock.synchronize(&block)
end

def connection_available?
[email protected]?
end
Expand Down