Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Huge feature pack: Deadlock detection, prioritized futures, smart up/down spinning of workers, logging #23

Open
wants to merge 25 commits into
base: master
Choose a base branch
from

Conversation

ggPeti
Copy link
Contributor

@ggPeti ggPeti commented Sep 29, 2014

Apologies for the giant PR, but one thing led to another, and it didn't make sense to implement one without another (well, logging could have been separated, but it was very useful for debugging while implementing the rest).

The features implemented are:

Deadlock detection

There are 2 kinds of deadlocks that we can detect.

1. Circular dependency

When futures depend on each other in a circle, Futuroscope will detect that and send each thread involved a DeadlockError with a message describing the situation. For example:

2.1.0 :001 > f2 = nil
 => nil 
2.1.0 :002 > f1 = future { f2 = future { f1.future_value }; f2.future_value }; f1.inspect
Futuroscope::DeadlockError: Cyclical dependency detected, the future was aborted.
    (...stack trace...)
2.1.0 :004 > f2.inspect
Futuroscope::DeadlockError: Cyclical dependency detected, the future was aborted.
    (...stack trace...)
2. Pool size too low

When the pool is full, but all futures are waiting for another future that doesn't have a worker yet, Futuroscope will fail a future which does not depend on any other future (and the one with the least priority out of those). (Note: This selection might be optimized to selecting the root of the smallest unary branch of the dependency forest instead of the root of the smallest tree, but this is a good enough solution for a situation that rarely occurs.)
For example:

2.1.0 :001 > Futuroscope.default_pool = Futuroscope::Pool.new 1..1
 => #<Futuroscope::Pool:0x000001020adef0 @min_workers=1, @max_workers=1, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x000001020ade78>, @workers=#<Set: {#<Futuroscope::Worker:0x000001020add88 @pool=#<Futuroscope::Pool:0x000001020adef0 ...>, @free=true, @thread=#<Thread:0x000001020add38 sleep>>}>, @mutex=#<Mutex:0x000001020addb0>, @futures={}> 
2.1.0 :002 > future { future { 1 } + 1 }
Futuroscope::DeadlockError: Pool size is too low, the future was aborted.
    (...stack trace...)

Prioritized futures

Instead of a queue, Futuroscope now uses something resembling a priority queue, where the priority is determined by how many threads are directly or indirectly blocking on the future's value. This can avoid deadlock situations, for example:

2.1.0 :001 > Futuroscope.default_pool = Futuroscope::Pool.new 2..2
 => #<Futuroscope::Pool:0x000001018c2470 @min_workers=2, @max_workers=2, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x000001018c23f8>, @workers=#<Set: {#<Futuroscope::Worker:0x000001018c2308 @pool=#<Futuroscope::Pool:0x000001018c2470 ...>, @free=true, @thread=#<Thread:0x000001018c22e0 sleep>>, #<Futuroscope::Worker:0x000001018c2150 @pool=#<Futuroscope::Pool:0x000001018c2470 ...>, @free=true, @thread=#<Thread:0x000001018c2128 sleep>>}>, @mutex=#<Mutex:0x000001018c2358>, @futures={}> 
2.1.0 :002 > f1, f2, f3, f4 = future { sleep 1; f4.future_value }, future { sleep 3 }, future { sleep 1; f4.future_value }, future { 4 }
 => [4, 3, 4, 4]

In the old implementation, this would raise a fatal error, because f1 starts to block on f4 before it gets to the queue, then f3 starts to block on f4 as well after f2 is done, so the pool gets full of blocking futures before f4 has a chance to be evaluated.

Smart up/down spinning of workers

Previously, with the push of every new future, there was a 50% chance that a new worker gets spun up if the limit hasn't been reached yet. Now the workers keep track whether they are free or busy, and the pool only spins up a new worker if it has more futures without workers than free workers.

Also previously every thread immediately quit if it was over the minimum thread count. Now, if the pool has more workers than the minimum, the workers have a 2 second linger period in which they will pick up new work if it's available, and only if no new work came in will they die. This is to minimize the cost of spinning threads up and down when the work comes in close spikes.

This also leads to better parallelization in some cases. Old version:

2.1.2 :001 > require 'benchmark'
 => true 
2.1.2 :002 > Futuroscope.default_pool = Futuroscope::Pool.new 1..10
 => #<Futuroscope::Pool:0x007fe502131840 @min_workers=1, @max_workers=10, @queue=#<Thread::Queue:0x007fe5021317f0>, @workers=#<Set: {#<Futuroscope::Worker:0x007fe5021316d8 @pool=#<Futuroscope::Pool:0x007fe502131840 ...>, @thread=#<Thread:0x007fe502131688 sleep>>}>, @mutex=#<Mutex:0x007fe502131700>> 
2.1.2 :004 > Benchmark.measure { (1..10).future_map { sleep 1 }.inspect }.real
 => 2.002055 

New version:

2.1.0 :001 > require 'benchmark'
 => true 
2.1.0 :002 > Futuroscope.default_pool = Futuroscope::Pool.new 1..10
 => #<Futuroscope::Pool:0x0000010189afb0 @min_workers=1, @max_workers=10, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x0000010189af38>, @workers=#<Set: {#<Futuroscope::Worker:0x0000010189ae20 @pool=#<Futuroscope::Pool:0x0000010189afb0 ...>, @free=true, @thread=#<Thread:0x0000010189adf8 sleep>>}>, @mutex=#<Mutex:0x0000010189ae70>, @futures={}> 
2.1.0 :003 > Benchmark.measure { (1..10).future_map { sleep 1 }.inspect }.real
 => 1.002627 

Logging

Futuroscope now supports logging. If you put loggers into Futuroscope.loggers (which is a simple array), they will get all log messages from inside. The loggers are expected to conform with Ruby's core Logger, in that they respond to these messages: debug, info, warn, error, fatal.

Example:

2.1.0 :001 > require 'logger'
 => true 
2.1.0 :002 > Futuroscope.loggers << Logger.new(STDERR)
 => [#<Logger:0x00000102887f90 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x00000102887f68 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x00000102887f18 @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mutex=#<Logger::LogDevice::LogDeviceMutex:0x00000102887ef0 @mon_owner=nil, @mon_count=0, @mon_mutex=#<Mutex:0x00000102887ea0>>>>] 
2.1.0 :003 > Futuroscope.default_pool = Futuroscope::Pool.new 1..1
I, [2014-09-29T16:50:17.214206 #85319]  INFO -- :         spun up worker with thread 2168719860
 => #<Futuroscope::Pool:0x00000102881690 @min_workers=1, @max_workers=1, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x00000102881500>, @workers=#<Set: {#<Futuroscope::Worker:0x00000102881410 @pool=#<Futuroscope::Pool:0x00000102881690 ...>, @free=true, @thread=#<Thread:0x000001028813e8 sleep>>}>, @mutex=#<Mutex:0x00000102881438>, @futures={}> 
I, [2014-09-29T16:50:17.214845 #85319]  INFO -- : POP:    thread 2168719860 going to sleep until there's something to do...
2.1.0 :004 > f = future { f.future_value }
I, [2014-09-29T16:50:25.741545 #85319]  INFO -- : PUSH:   added future 2168704840
I, [2014-09-29T16:50:25.741620 #85319]  INFO -- :         sending signal to wake up a thread
D, [2014-09-29T16:50:25.741673 #85319] DEBUG -- :         current priorities: {"future 2168704840"=>0}
I, [2014-09-29T16:50:25.741800 #85319]  INFO -- : DEPEND: thread 2156059640 depends on future 2168704840
D, [2014-09-29T16:50:25.741861 #85319] DEBUG -- :         current dependencies: {"thread 2156059640"=>"future 2168704840"}
I, [2014-09-29T16:50:25.741962 #85319]  INFO -- :         incrementing priority for future 2168704840
I, [2014-09-29T16:50:25.742025 #85319]  INFO -- : POP:    ... thread 2168719860 woke up
D, [2014-09-29T16:50:25.742069 #85319] DEBUG -- :         current priorities: {"future 2168704840"=>1}
D, [2014-09-29T16:50:25.742110 #85319] DEBUG -- :         current future workers: {"future 2168704840"=>nil}
I, [2014-09-29T16:50:25.742156 #85319]  INFO -- : POP:    thread 2168719860 will start working on future 2168704840
I, [2014-09-29T16:50:25.742203 #85319]  INFO -- : DEPEND: thread 2168719860 depends on future 2168704840
D, [2014-09-29T16:50:25.742266 #85319] DEBUG -- :         current dependencies: {"thread 2156059640"=>"future 2168704840", "thread 2168719860"=>"future 2168704840"}
E, [2014-09-29T16:50:25.742313 #85319] ERROR -- :         deadlock! cyclical dependency, sending interrupt to all threads involved
I, [2014-09-29T16:50:25.742388 #85319]  INFO -- : DONE:   thread 2168719860 is done with future 2168704840
I, [2014-09-29T16:50:25.742423 #85319]  INFO -- :         deleting future 2168704840 from the task list
I, [2014-09-29T16:50:25.742462 #85319]  INFO -- :         deleting dependency from thread 2156059640 to future 2168704840
I, [2014-09-29T16:50:25.742498 #85319]  INFO -- :         deleting dependency from thread 2168719860 to future 2168704840
I, [2014-09-29T16:50:25.742548 #85319]  INFO -- : POP:    thread 2168719860 going to sleep until there's something to do...
Futuroscope::DeadlockError: Cyclical dependency detected, the future was aborted.
    (...stack trace...)

I'd also like to note that this PR comes with multiple engineering decisions that are worth discussing. For example, I've moved away from using a Queue in both the pool and the future; instead, I'm now making use of ConditionVariables (which Queue is using internally as well).

Another one is that the pool keeps track of the __id__s of the futures as hash keys. Why is that so, why am I using 2 hashes instead of one: one with future.__id__s pointing to priority values, and one with future.__id__s pointing to the actual futures? Because if you want to use a Delegator as a hash key, Ruby will call #hash on the object, which gets forwarded to the wrapped object, creating a deadlock. Not forwarding #hash is no solution: it will make the futures non-transparent, hsh[:key] will not be the same as hsh[future { :key }].

Please discuss any parts that you disagree with, I'm happy to elaborate and of course it's always possible that I missed something.

@@ -20,4 +21,17 @@ def self.default_pool
def self.default_pool=(pool)
@default_pool = pool
end

# Gets the current loggers. Add objects to it that have the below methods defined to log on them.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long. [99/80]

end
end


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line detected.

Thread.handle_interrupt(DeadlockError => :immediate) do
@resolved_future = { value: @block.call }
end
rescue ::Exception => e

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid rescuing the Exception class.

@txus
Copy link
Member

txus commented Feb 17, 2015

@ggPeti this is.... amazing. I say we drop support for 1.9.3 and only support 2+, the latest Rbx (compatible with 2+), and in the future JRuby 9000. Mind changing the CI settings to make your PR pass? I'll merge as soon as it's green. Thank you SO much for all these features! :)

@ggPeti
Copy link
Contributor Author

ggPeti commented Feb 17, 2015

Thanks for reviewing it! I will get back on this in the following days. Rbx didn't have asyncronous thread messaging implemented at the time of my PR, I will check on it to see if they implemented it since. If not, I will see if I can implement it myself. This PR makes heavy use of handle_interrupt so we definitely need it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants