diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index abc651ea..271751e0 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -8,11 +8,13 @@ class Rage::FiberScheduler def initialize @root_fiber = Fiber.current @dns_cache = {} + + @fiber_timeouts = Hash.new { |h, k| h[k] = {} } end def io_wait(io, events, timeout = nil) f = Fiber.current - ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } + ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) if f.alive? } err = Fiber.defer(io.fileno) if err == false || (err && err < 0) @@ -66,19 +68,24 @@ def kernel_sleep(duration = nil) Fiber.pause if duration.nil? || duration < 1 end - # TODO: GC works a little strange with this closure; - # - # def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) - # fiber, block_status = Fiber.current, :running - # ::Iodine.run_after((duration * 1000).to_i) do - # fiber.raise(exception_class, exception_arguments) if block_status == :running - # end + def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) + f = Fiber.current + timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration + + @fiber_timeouts[f][timeout] = { + exception_class: exception_class, + exception_arguments: exception_arguments + } - # result = block.call - # block_status = :finished + schedule_timeout_check - # result - # end + begin + block.call + ensure + @fiber_timeouts[f].delete(timeout) + @fiber_timeouts.delete(f) if @fiber_timeouts[f].empty? + end + end def address_resolve(hostname) @dns_cache[hostname] ||= begin @@ -97,7 +104,7 @@ def block(_blocker, timeout = nil) unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } - f.resume + f.resume if f.alive? end end @@ -145,4 +152,49 @@ def fiber(&block) def close ::Iodine::Scheduler.close end + + private + + def schedule_timeout_check + return if @fiber_timeouts.empty? + + closest_timeout = nil + @fiber_timeouts.each_value do |timeouts| + timeouts.each_key do |timeout| + closest_timeout = timeout if closest_timeout.nil? || timeout < closest_timeout + end + end + + return unless closest_timeout + + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay_ms = ((closest_timeout - now) * 1000).ceil + delay_ms = 0 if delay_ms < 0 + + ::Iodine.run_after(delay_ms) do + check_timeouts + schedule_timeout_check + end + end + + def check_timeouts + fibers_to_raise = [] + + @fiber_timeouts.each do |fiber, timeouts| + timeouts.each do |timeout, context| + next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout + + fibers_to_raise << -> do + fiber.raise(context[:exception_class], *context[:exception_arguments]) + + Iodine.unsubscribe(fiber.__block_channel) + Iodine.unsubscribe(fiber.__await_channel) + end + end + end + + fibers_to_raise.each(&:call) + + fibers_to_raise.clear + end end