Skip to content
78 changes: 65 additions & 13 deletions lib/rage/fiber_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ class Rage::FiberScheduler
def initialize
@root_fiber = Fiber.current
@dns_cache = {}

@fiber_timeouts = Hash.new { |h, k| h[k] = {} }
end

def io_wait(io, events, timeout = nil)
f = Fiber.current
::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) }
::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) if f.alive? }

err = Fiber.defer(io.fileno)
if err == false || (err && err < 0)
Expand Down Expand Up @@ -66,19 +68,24 @@ def kernel_sleep(duration = nil)
Fiber.pause if duration.nil? || duration < 1
end

# TODO: GC works a little strange with this closure;
#
# def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block)
# fiber, block_status = Fiber.current, :running
# ::Iodine.run_after((duration * 1000).to_i) do
# fiber.raise(exception_class, exception_arguments) if block_status == :running
# end
def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block)
f = Fiber.current
timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration

@fiber_timeouts[f][timeout] = {
exception_class: exception_class,
exception_arguments: exception_arguments
}

# result = block.call
# block_status = :finished
schedule_timeout_check

# result
# end
begin
block.call
ensure
@fiber_timeouts[f].delete(timeout)
@fiber_timeouts.delete(f) if @fiber_timeouts[f].empty?
end
end

def address_resolve(hostname)
@dns_cache[hostname] ||= begin
Expand All @@ -97,7 +104,7 @@ def block(_blocker, timeout = nil)
unless fulfilled
fulfilled = true
::Iodine.defer { ::Iodine.unsubscribe(channel) }
f.resume
f.resume if f.alive?
end
end

Expand Down Expand Up @@ -145,4 +152,49 @@ def fiber(&block)
def close
::Iodine::Scheduler.close
end

private

def schedule_timeout_check
return if @fiber_timeouts.empty?

closest_timeout = nil
@fiber_timeouts.each_value do |timeouts|
timeouts.each_key do |timeout|
closest_timeout = timeout if closest_timeout.nil? || timeout < closest_timeout
end
end

return unless closest_timeout

now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
delay_ms = ((closest_timeout - now) * 1000).ceil
delay_ms = 0 if delay_ms < 0

::Iodine.run_after(delay_ms) do
check_timeouts
schedule_timeout_check
end
end

def check_timeouts
fibers_to_raise = []

@fiber_timeouts.each do |fiber, timeouts|
timeouts.each do |timeout, context|
next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout

fibers_to_raise << -> do
fiber.raise(context[:exception_class], *context[:exception_arguments])

Iodine.unsubscribe(fiber.__block_channel)
Iodine.unsubscribe(fiber.__await_channel)
end
end
end

fibers_to_raise.each(&:call)

fibers_to_raise.clear
end
end