Skip to content

Commit 292479b

Browse files
committed
Rework Read method
This commit reworks the `FSM.Read` method to instead be a single entrypoint using a type switch on the pointer types of each command registered. To me this is a bit more succint than having to perform a rather large registration function. Instead of this per-command: ```go fsm.Handle(pgfsm.CommandHandler[TestCommandA](func(ctx context.Context, a TestCommandA) (pgfsm.Command, error) { return TestCommandB{Foo: a.Foo + 1}, nil })) ``` You now only need to do this: ```go pgfsm.RegisterCommand[TestCommandB](fsm) ``` And add a case to the type switch in the new `Read` method: ```go return fsm.Read(ctx, func(ctx context.Context, cmd any) (pgfsm.Command, error) { switch msg := cmd.(type) { case *TestCommandB: // Handler logic } }) ``` Signed-off-by: David Bond <[email protected]>
1 parent cffa420 commit 292479b

File tree

2 files changed

+74
-93
lines changed

2 files changed

+74
-93
lines changed

pgfsm.go

Lines changed: 52 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ type (
2727
// to handle these commands in order. Because a database is being used as the command store, no leadership is
2828
// required among the FSM instances and commands will be processed in order using an ordinal identifier.
2929
//
30-
// Handler implementations should be registered prior to calling FSM.Read to ensure commands can be processed.
30+
// Command implementations should be registered prior to calling FSM.Read using the RegisterCommand function to ensure
31+
// that commands can be processed.
3132
FSM struct {
32-
db *sql.DB
33-
handlers map[string]Handler
34-
options options
33+
db *sql.DB
34+
commandFactories map[string]func() any
35+
options options
3536
}
3637

3738
// The Command interface is used to describe types implemented by package consumers that represent the contents
@@ -79,16 +80,26 @@ func New(db *sql.DB, options ...Option) (*FSM, error) {
7980
}
8081

8182
return &FSM{
82-
db: db,
83-
options: opts,
84-
handlers: make(map[string]Handler),
83+
db: db,
84+
options: opts,
85+
commandFactories: make(map[string]func() any),
8586
}, nil
8687
}
8788

88-
// Handle registers a Handler implementation with the FSM which will be invoked when a Command with a matching kind is
89-
// read from the database. This method should be called with all your handlers prior to calling FSM.Read.
90-
func (fsm *FSM) Handle(handler Handler) {
91-
fsm.handlers[handler.kind()] = handler
89+
// RegisterCommand registers a Command implementation with the FSM. This function must be called for each of your
90+
// Command implementations so that the FSM knows how to decode them. This function's parameterized type must be the
91+
// value of your Command implementation.
92+
//
93+
// For example:
94+
//
95+
// pgfsm.RegisterCommand[MyCommand](fsm)
96+
func RegisterCommand[T Command](fsm *FSM) {
97+
var cmd T
98+
99+
fsm.commandFactories[cmd.Kind()] = func() any {
100+
var out T
101+
return &out
102+
}
92103
}
93104

94105
// Write a Command to the FSM. This Command will be encoded using the Encoding implementation and stored within the
@@ -118,22 +129,31 @@ func (fsm *FSM) Write(ctx context.Context, cmd Command) error {
118129
})
119130
}
120131

121-
// Read commands from the FSM. For each command read, the relevant Handler implementation will be invoked. This method
122-
// blocks until the provided context is cancelled, or a Handler implementation returns an error.
123-
func (fsm *FSM) Read(ctx context.Context) error {
132+
type (
133+
// The Handler type is a function used with the FSM.Read method and is invoked per-command read by the FSM. A
134+
// type switch should be used on the cmd parameter for your individual Command implementations as a pointer of
135+
// the concrete type.
136+
Handler func(ctx context.Context, cmd any) (Command, error)
137+
)
138+
139+
// Read commands from the FSM. For each command read, the provided Handler implementation will be invoked. This method
140+
// blocks until the provided context is cancelled, or the Handler implementation returns an error. The Handler is
141+
// intended to be used as a single entrypoint for commands. This method should use a type switch on the pointer types
142+
// of your commands and react accordingly.
143+
func (fsm *FSM) Read(ctx context.Context, h Handler) error {
124144
for {
125145
select {
126146
case <-ctx.Done():
127147
return ctx.Err()
128148
default:
129-
if err := fsm.next(ctx); err != nil {
149+
if err := fsm.next(ctx, h); err != nil {
130150
return err
131151
}
132152
}
133153
}
134154
}
135155

136-
func (fsm *FSM) next(ctx context.Context) error {
156+
func (fsm *FSM) next(ctx context.Context, h Handler) error {
137157
return transaction(ctx, fsm.db, func(ctx context.Context, tx *sql.Tx) error {
138158
id, kind, data, err := next(ctx, tx)
139159
switch {
@@ -148,7 +168,7 @@ func (fsm *FSM) next(ctx context.Context) error {
148168
slog.Int64("command_id", id),
149169
)
150170

151-
h, ok := fsm.handlers[kind]
171+
factory, ok := fsm.commandFactories[kind]
152172
switch {
153173
case !ok && fsm.options.skipUnknownCommands:
154174
log.WarnContext(ctx, "skipping unknown command")
@@ -157,83 +177,44 @@ func (fsm *FSM) next(ctx context.Context) error {
157177
return UnknownCommandError{Kind: kind}
158178
}
159179

180+
cmd := factory()
181+
if err = fsm.options.encoder.Decode(data, cmd); err != nil {
182+
return fmt.Errorf("failed to decode command %q: %w", kind, err)
183+
}
184+
160185
log.InfoContext(ctx, "handling command")
161-
cmd, err := h.handle(ctx, fsm.options.encoder, data)
186+
returned, err := h(ctx, cmd)
162187
if err != nil {
163188
log.ErrorContext(ctx, "error handling command")
164189
return err
165190
}
166191

167-
if cmd != nil {
168-
169-
switch command := cmd.(type) {
192+
if returned != nil {
193+
switch command := returned.(type) {
170194
case batchCommand:
171-
for _, cmd = range command {
172-
log.With(slog.String("received_command_kind", cmd.Kind())).
195+
for _, batched := range command {
196+
log.With(slog.String("received_command_kind", batched.Kind())).
173197
InfoContext(ctx, "received additional command")
174198

175-
if err = insert(ctx, tx, fsm.options.encoder, cmd); err != nil {
199+
if err = insert(ctx, tx, fsm.options.encoder, batched); err != nil {
176200
return err
177201
}
178202
}
179203

180204
default:
181-
log.With(slog.String("received_command_kind", cmd.Kind())).
205+
log.With(slog.String("received_command_kind", returned.Kind())).
182206
InfoContext(ctx, "received additional command")
183207

184-
err = insert(ctx, tx, fsm.options.encoder, cmd)
185-
}
186-
187-
if err != nil {
188-
return err
208+
if err = insert(ctx, tx, fsm.options.encoder, command); err != nil {
209+
return err
210+
}
189211
}
190212
}
191213

192214
return remove(ctx, tx, id)
193215
})
194216
}
195217

196-
type (
197-
// The Handler interface describes types that can process and decode individual commands read from the
198-
// database. This interface isn't intended to be implemented by consumers of this package. Rather, they
199-
// should wrap their handler function using the NewHandler method, which allows you to maintain type
200-
// safety by setting an Encoding on the FSM.
201-
Handler interface {
202-
handle(context.Context, Encoding, []byte) (Command, error)
203-
kind() string
204-
}
205-
206-
// The CommandHandler type is a function that consumers of this package are expected to implement and register
207-
// with the FSM in order to handle their self-defined Command implementations. This function will be invoked each
208-
// time the FSM finds a Command with a matching kind and allows an optional command to be returned in response
209-
// to the one read.
210-
//
211-
// Returning a nil Command should be used to finish handling a chain of commands. If this function returns an
212-
// error, the read command will remain within the FSM and the error will be propagated up to the FSM.Read method,
213-
// causing the FSM to terminate.
214-
CommandHandler[T Command] func(context.Context, T) (Command, error)
215-
)
216-
217-
func (ch CommandHandler[T]) handle(ctx context.Context, encoder Encoding, p []byte) (Command, error) {
218-
var input T
219-
220-
if err := encoder.Decode(p, &input); err != nil {
221-
return nil, err
222-
}
223-
224-
return ch(ctx, input)
225-
}
226-
227-
func (ch CommandHandler[T]) kind() string {
228-
var cmd T
229-
230-
// Note: Your IDE may complain of a possible nil pointer dereference here. This won't be an issue when Command
231-
// implementations are values. If they are used as pointers, the method will still be called. A nil pointer
232-
// dereference could occur if the Kind implementation attempts to construct the string using member fields but
233-
// really these should always be constants.
234-
return cmd.Kind()
235-
}
236-
237218
type (
238219
options struct {
239220
skipUnknownCommands bool

pgfsm_test.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,27 +67,9 @@ func TestFSM_ReadWrite(t *testing.T) {
6767
ctx, cancel := context.WithTimeout(t.Context(), time.Second*10)
6868
defer cancel()
6969

70-
fsm.Handle(pgfsm.CommandHandler[TestCommandA](func(ctx context.Context, a TestCommandA) (pgfsm.Command, error) {
71-
handledA = true
72-
73-
return TestCommandB{Foo: a.Foo + 1}, nil
74-
}))
75-
76-
fsm.Handle(pgfsm.CommandHandler[TestCommandB](func(ctx context.Context, b TestCommandB) (pgfsm.Command, error) {
77-
handledB = true
78-
79-
return pgfsm.Batch(
80-
TestCommandC{Foo: b.Foo + 1},
81-
TestCommandC{Foo: b.Foo + 1},
82-
), nil
83-
}))
84-
85-
fsm.Handle(pgfsm.CommandHandler[TestCommandC](func(ctx context.Context, c TestCommandC) (pgfsm.Command, error) {
86-
handledC = true
87-
assert.EqualValues(t, 3, c.Foo)
88-
89-
return nil, nil
90-
}))
70+
pgfsm.RegisterCommand[TestCommandA](fsm)
71+
pgfsm.RegisterCommand[TestCommandB](fsm)
72+
pgfsm.RegisterCommand[TestCommandC](fsm)
9173

9274
group, ctx := errgroup.WithContext(ctx)
9375
group.Go(func() error {
@@ -99,7 +81,25 @@ func TestFSM_ReadWrite(t *testing.T) {
9981
})
10082

10183
group.Go(func() error {
102-
return fsm.Read(ctx)
84+
return fsm.Read(ctx, func(ctx context.Context, cmd any) (pgfsm.Command, error) {
85+
switch msg := cmd.(type) {
86+
case *TestCommandA:
87+
handledA = true
88+
return TestCommandB{Foo: msg.Foo + 1}, nil
89+
case *TestCommandB:
90+
handledB = true
91+
return pgfsm.Batch(
92+
TestCommandC{Foo: msg.Foo + 1},
93+
TestCommandC{Foo: msg.Foo + 1},
94+
), nil
95+
case *TestCommandC:
96+
handledC = true
97+
return nil, nil
98+
default:
99+
assert.Fail(t, "should be skipping unknown commands")
100+
return nil, nil
101+
}
102+
})
103103
})
104104

105105
err = group.Wait()

0 commit comments

Comments
 (0)