Skip to content

Commit b0c4434

Browse files
committed
Introduce Loop abstraction.
1 parent 89ec3d1 commit b0c4434

File tree

3 files changed

+88
-8
lines changed

3 files changed

+88
-8
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
module Async
2+
module Container
3+
module Supervisor
4+
# A robust loop that executes a block at regular intervals.
5+
#
6+
# If an error occurs during the execution of the block, it is logged and the loop continues
7+
# (if `robust` is true). Otherwise, the error is raised and the loop stops.
8+
#
9+
# @parameter interval [Integer] The interval in seconds between executions of the block.
10+
# @parameter robust [Boolean] Whether to continue the loop on errors.
11+
class Loop
12+
def self.run(...)
13+
self.new.tap do |loop|
14+
loop.run
15+
end
16+
end
17+
18+
def initialize(interval: 60, robust: true, skip: true, &block)
19+
@interval = interval
20+
@robust = robust
21+
@skip = skip
22+
@block = block
23+
end
24+
25+
def running?
26+
@running
27+
end
28+
29+
# Stop on the next iteration of the loop.
30+
def stop!
31+
@running = false
32+
end
33+
34+
def run
35+
if @running
36+
raise RuntimeError, "Loop is already running!"
37+
end
38+
39+
@running = true
40+
while @running
41+
clock = Async::Clock.start!
42+
43+
begin
44+
@block.call(self)
45+
rescue => error
46+
if @robust
47+
Console.error(self, "Loop error:", error)
48+
else
49+
raise error
50+
end
51+
end
52+
53+
break unless @running
54+
55+
remaining = @interval - clock.elapsed
56+
57+
if remaining <= 0
58+
Console.warn(self, "Loop is behind schedule by #{-remaining.round(2)} seconds.", interval: @interval)
59+
60+
# We are behind schedule, so we can catch up:
61+
if @skip
62+
# Remaining will now be positive
63+
remaining = remaining % @interval
64+
end
65+
end
66+
67+
if remaining.positive?
68+
sleep(remaining)
69+
end
70+
end
71+
ensure
72+
@running = false
73+
end
74+
end
75+
end
76+
end
77+
end

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
require "memory/leak/cluster"
77
require "set"
88

9+
require_relative "loop"
10+
911
module Async
1012
module Container
1113
module Supervisor
@@ -103,8 +105,12 @@ def memory_leak_detected(process_id, monitor)
103105
end
104106

105107
# Kill the process gently:
106-
Console.info(self, "Killing process!", child: {process_id: process_id})
107-
Process.kill(:INT, process_id)
108+
begin
109+
Console.info(self, "Killing process!", child: {process_id: process_id})
110+
Process.kill(:INT, process_id)
111+
rescue => error
112+
Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error)
113+
end
108114

109115
true
110116
end
@@ -114,7 +120,7 @@ def memory_leak_detected(process_id, monitor)
114120
# @returns [Async::Task] The task that is running the memory monitor.
115121
def run
116122
Async do
117-
while true
123+
Loop.run(interval: @interval) do
118124
# This block must return true if the process was killed.
119125
@cluster.check! do |process_id, monitor|
120126
Console.error(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
@@ -125,8 +131,6 @@ def run
125131
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
126132
end
127133
end
128-
129-
sleep(@interval)
130134
end
131135
end
132136
end

lib/async/container/supervisor/process_monitor.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "process/metrics"
7+
require_relative "loop"
78

89
module Async
910
module Container
@@ -70,15 +71,13 @@ def status(call)
7071
# @returns [Async::Task] The task that is running the process monitor.
7172
def run
7273
Async do
73-
while true
74+
Loop.run(interval: @interval) do
7475
metrics = self.metrics
7576

7677
# Log each process individually for better searchability in log platforms:
7778
metrics.each do |process_id, general|
7879
Console.info(self, "Process metrics captured.", general: general)
7980
end
80-
81-
sleep(@interval)
8281
end
8382
end
8483
end

0 commit comments

Comments
 (0)