Skip to content

Commit c9c0494

Browse files
committed
Dynamic poll interval
This commit modifies how the `Read` method polls records in the `command` table. Previously, it simply hammered the database as frequently as possible to find new commands. This isn't great as we're effectively choking the database for any other operations that may occur to do zero work. Now, I've added a dynamic backoff starting at a minimum value and capping out at a maximum. The interval is doubled until it reaches the maximum. This allows the FSM to chill out a bit against the database at times when there are no commands being written to the command table. Signed-off-by: David Bond <[email protected]>
1 parent 71ba45f commit c9c0494

File tree

2 files changed

+39
-4
lines changed

2 files changed

+39
-4
lines changed

pgfsm.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"log/slog"
23+
"time"
2324
)
2425

2526
type (
@@ -142,24 +143,42 @@ type (
142143
// intended to be used as a single entrypoint for commands. This method should use a type switch on the pointer types
143144
// of your commands and react accordingly.
144145
func (fsm *FSM) Read(ctx context.Context, h Handler) error {
146+
interval := fsm.options.minPollInterval
147+
timer := time.NewTimer(interval)
148+
defer timer.Stop()
149+
145150
for {
146151
select {
147152
case <-ctx.Done():
148153
return ctx.Err()
149-
default:
150-
if err := fsm.next(ctx, h); err != nil {
154+
case <-timer.C:
155+
err := fsm.next(ctx, h)
156+
switch {
157+
case errors.Is(err, errNoCommands):
158+
interval = min(interval*2, fsm.options.maxPollInterval)
159+
timer.Reset(interval)
160+
case err != nil:
151161
return err
162+
default:
163+
interval = fsm.options.minPollInterval
164+
timer.Reset(interval)
152165
}
153166
}
154167
}
155168
}
156169

170+
var (
171+
errNoCommands = errors.New("no commands")
172+
)
173+
157174
func (fsm *FSM) next(ctx context.Context, h Handler) error {
175+
fsm.options.logger.DebugContext(ctx, "checking for new commands")
176+
158177
return transaction(ctx, fsm.db, func(ctx context.Context, tx *sql.Tx) error {
159178
id, kind, data, err := next(ctx, tx)
160179
switch {
161180
case errors.Is(err, sql.ErrNoRows):
162-
return nil
181+
return errNoCommands
163182
case err != nil:
164183
return err
165184
}
@@ -221,6 +240,8 @@ type (
221240
skipUnknownCommands bool
222241
encoder Encoding
223242
logger *slog.Logger
243+
minPollInterval time.Duration
244+
maxPollInterval time.Duration
224245
}
225246

226247
// The Option type is a function that can modify the behaviour of the FSM.
@@ -232,6 +253,8 @@ func defaultOptions() options {
232253
skipUnknownCommands: false,
233254
encoder: &JSON{},
234255
logger: slog.New(slog.DiscardHandler),
256+
minPollInterval: time.Millisecond,
257+
maxPollInterval: time.Second * 5,
235258
}
236259
}
237260

@@ -253,6 +276,17 @@ func UseEncoding(e Encoding) Option {
253276
return func(o *options) { o.encoder = e }
254277
}
255278

279+
// PollInterval is an Option implementation that configures the minimum and maximum frequency at which Command implementations
280+
// will be read from the database. Each time the FSM checks for commands and finds none, it will half the frequency at
281+
// which it checks up to the maximum value. This is done to prevent excessive load on the database at times where there
282+
// are few commands being written. The default values are 1ms and 5s.
283+
func PollInterval(min, max time.Duration) Option {
284+
return func(o *options) {
285+
o.minPollInterval = min
286+
o.maxPollInterval = max
287+
}
288+
}
289+
256290
// UseLogger is an Option implementation that modifies the logger used by the FSM. By default, the FSM uses
257291
// slog.DiscardHandler and will not write any logs.
258292
func UseLogger(l *slog.Logger) Option {

pgfsm_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func TestFSM_ReadWrite(t *testing.T) {
5555
pgfsm.SkipUnknownCommands(),
5656
pgfsm.UseEncoding(&pgfsm.GOB{}),
5757
pgfsm.UseLogger(logger),
58+
pgfsm.PollInterval(time.Millisecond, time.Minute),
5859
)
5960
require.NoError(t, err)
6061

@@ -64,7 +65,7 @@ func TestFSM_ReadWrite(t *testing.T) {
6465
handledC bool
6566
)
6667

67-
ctx, cancel := context.WithTimeout(t.Context(), time.Second*10)
68+
ctx, cancel := context.WithTimeout(t.Context(), time.Second*5)
6869
defer cancel()
6970

7071
pgfsm.RegisterCommand[TestCommandA](fsm)

0 commit comments

Comments
 (0)