Skip to content

Commit cffa420

Browse files
committed
Add Batch function
This commit adds a `Batch` function that allows reads and writes to the FSM to contain multiple `Command` implementations. This works nicely as a response to a `Handler` as it provides the transactionality where the success of once command produces 2 or more. Signed-off-by: David Bond <[email protected]>
1 parent 1a4b4a4 commit cffa420

File tree

3 files changed

+67
-17
lines changed

3 files changed

+67
-17
lines changed

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
7474
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
7575
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
7676
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
77-
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
78-
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
7977
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
8078
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
8179
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=

pgfsm.go

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"errors"
2121
"fmt"
2222
"log/slog"
23-
"time"
2423
)
2524

2625
type (
@@ -95,26 +94,38 @@ func (fsm *FSM) Handle(handler Handler) {
9594
// Write a Command to the FSM. This Command will be encoded using the Encoding implementation and stored within the
9695
// database, where it can then be read and the relevant Handler invoked.
9796
func (fsm *FSM) Write(ctx context.Context, cmd Command) error {
98-
fsm.options.logger.
99-
With(slog.String("command_kind", cmd.Kind())).
100-
InfoContext(ctx, "writing command")
101-
10297
return transaction(ctx, fsm.db, func(ctx context.Context, tx *sql.Tx) error {
103-
return insert(ctx, tx, fsm.options.encoder, cmd)
98+
switch command := cmd.(type) {
99+
case batchCommand:
100+
for _, cmd = range command {
101+
fsm.options.logger.
102+
With(slog.String("command_kind", cmd.Kind())).
103+
InfoContext(ctx, "writing command")
104+
105+
if err := insert(ctx, tx, fsm.options.encoder, cmd); err != nil {
106+
return err
107+
}
108+
}
109+
default:
110+
fsm.options.logger.
111+
With(slog.String("command_kind", cmd.Kind())).
112+
InfoContext(ctx, "writing command")
113+
114+
return insert(ctx, tx, fsm.options.encoder, cmd)
115+
}
116+
117+
return nil
104118
})
105119
}
106120

107121
// Read commands from the FSM. For each command read, the relevant Handler implementation will be invoked. This method
108122
// blocks until the provided context is cancelled, or a Handler implementation returns an error.
109123
func (fsm *FSM) Read(ctx context.Context) error {
110-
ticker := time.NewTicker(time.Second / 10)
111-
defer ticker.Stop()
112-
113124
for {
114125
select {
115126
case <-ctx.Done():
116127
return ctx.Err()
117-
case <-ticker.C:
128+
default:
118129
if err := fsm.next(ctx); err != nil {
119130
return err
120131
}
@@ -154,10 +165,26 @@ func (fsm *FSM) next(ctx context.Context) error {
154165
}
155166

156167
if cmd != nil {
157-
log.With(slog.String("received_command_kind", cmd.Kind())).
158-
InfoContext(ctx, "received additional command")
159168

160-
if err = insert(ctx, tx, fsm.options.encoder, cmd); err != nil {
169+
switch command := cmd.(type) {
170+
case batchCommand:
171+
for _, cmd = range command {
172+
log.With(slog.String("received_command_kind", cmd.Kind())).
173+
InfoContext(ctx, "received additional command")
174+
175+
if err = insert(ctx, tx, fsm.options.encoder, cmd); err != nil {
176+
return err
177+
}
178+
}
179+
180+
default:
181+
log.With(slog.String("received_command_kind", cmd.Kind())).
182+
InfoContext(ctx, "received additional command")
183+
184+
err = insert(ctx, tx, fsm.options.encoder, cmd)
185+
}
186+
187+
if err != nil {
161188
return err
162189
}
163190
}
@@ -249,3 +276,18 @@ func UseEncoding(e Encoding) Option {
249276
func UseLogger(l *slog.Logger) Option {
250277
return func(o *options) { o.logger = l.WithGroup("pgfsm") }
251278
}
279+
280+
type (
281+
batchCommand []Command
282+
)
283+
284+
func (cmd batchCommand) Kind() string {
285+
return ""
286+
}
287+
288+
// Batch returns a single Command implementation that wraps multiple other Command implementations. This can be used
289+
// to return multiple commands at once when returning from a CommandHandler function. Or to send multiple commands to
290+
// the FSM at once using FSM.Write.
291+
func Batch(commands ...Command) Command {
292+
return batchCommand(commands)
293+
}

pgfsm_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,23 +69,33 @@ func TestFSM_ReadWrite(t *testing.T) {
6969

7070
fsm.Handle(pgfsm.CommandHandler[TestCommandA](func(ctx context.Context, a TestCommandA) (pgfsm.Command, error) {
7171
handledA = true
72+
7273
return TestCommandB{Foo: a.Foo + 1}, nil
7374
}))
7475

7576
fsm.Handle(pgfsm.CommandHandler[TestCommandB](func(ctx context.Context, b TestCommandB) (pgfsm.Command, error) {
7677
handledB = true
77-
return TestCommandC{Foo: b.Foo + 1}, nil
78+
79+
return pgfsm.Batch(
80+
TestCommandC{Foo: b.Foo + 1},
81+
TestCommandC{Foo: b.Foo + 1},
82+
), nil
7883
}))
7984

8085
fsm.Handle(pgfsm.CommandHandler[TestCommandC](func(ctx context.Context, c TestCommandC) (pgfsm.Command, error) {
8186
handledC = true
8287
assert.EqualValues(t, 3, c.Foo)
88+
8389
return nil, nil
8490
}))
8591

8692
group, ctx := errgroup.WithContext(ctx)
8793
group.Go(func() error {
88-
return fsm.Write(ctx, TestCommandA{Foo: 1})
94+
return fsm.Write(ctx, pgfsm.Batch(
95+
TestCommandA{Foo: 1},
96+
TestCommandA{Foo: 1},
97+
TestCommandA{Foo: 1},
98+
))
8999
})
90100

91101
group.Go(func() error {

0 commit comments

Comments
 (0)