Skip to content

Commit abb9d03

Browse files
authored
flatten send and set event arguments (#62)
- Flatten send and set event inputs - Simplify get /recv/setevent/send inputs - Fix Sleep and Cancel interface signature to accept the context and match the package level methods ( 😢 )
1 parent 230d4dd commit abb9d03

File tree

7 files changed

+93
-176
lines changed

7 files changed

+93
-176
lines changed

README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,15 +241,12 @@ For example, build a reliable billing workflow that durably waits for a notifica
241241

242242
```golang
243243
func sendWorkflow(ctx dbos.DBOSContext, message string) (string, error) {
244-
err := dbos.Send(ctx, dbos.WorkflowSendInput[string]{
245-
DestinationID: "receiverID",
246-
Topic: "topic",
247-
Message: message,
248-
})
244+
err := dbos.Send(ctx, "receiverID", message, "topic")
245+
return "sent", err
249246
}
250247

251248
func receiveWorkflow(ctx dbos.DBOSContext, topic string) (string, error) {
252-
return dbos.Recv[string](ctx, dbos.WorkflowRecvInput{Topic: topic, Timeout: 48 * time.Hour})
249+
return dbos.Recv[string](ctx, topic, 48 * time.Hour)
253250
}
254251

255252
// Start a receiver in the background

dbos/admin_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
412412
workflowID := r.PathValue("id")
413413
ctx.logger.Info("Cancelling workflow", "workflow_id", workflowID)
414414

415-
err := ctx.CancelWorkflow(workflowID)
415+
err := ctx.CancelWorkflow(ctx, workflowID)
416416
if err != nil {
417417
ctx.logger.Error("Failed to cancel workflow", "workflow_id", workflowID, "error", err)
418418
http.Error(w, fmt.Sprintf("Failed to cancel workflow: %v", err), http.StatusInternalServerError)

dbos/dbos.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,18 @@ type DBOSContext interface {
7777
// Workflow operations
7878
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
7979
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
80-
Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow
81-
Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow
82-
SetEvent(_ DBOSContext, input WorkflowSetEventInput) error // Set a key-value event for this workflow
83-
GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow
84-
Sleep(duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
80+
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
81+
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
82+
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
83+
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
84+
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
8585
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
8686
GetStepID() (int, error) // Get the current step ID (only available within workflows)
8787

8888
// Workflow management
8989
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
9090
Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters
91-
CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED
91+
CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED
9292
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
9393
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
9494
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria

dbos/serialization_test.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func setEventUserDefinedTypeWorkflow(ctx DBOSContext, input string) (string, err
212212
},
213213
}
214214

215-
err := SetEvent(ctx, GenericWorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData})
215+
err := SetEvent(ctx, input, eventData)
216216
if err != nil {
217217
return "", err
218218
}
@@ -236,11 +236,7 @@ func TestSetEventSerialize(t *testing.T) {
236236
assert.Equal(t, "user-defined-event-set", result)
237237

238238
// Retrieve the event to verify it was properly serialized and can be deserialized
239-
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, WorkflowGetEventInput{
240-
TargetWorkflowID: setHandle.GetWorkflowID(),
241-
Key: "user-defined-key",
242-
Timeout: 3 * time.Second,
243-
})
239+
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second)
244240
require.NoError(t, err)
245241

246242
// Verify the retrieved data matches what we set
@@ -268,12 +264,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,
268264
}
269265

270266
// Send should automatically register this type with gob
271-
// Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input
272-
err := Send(ctx, GenericWorkflowSendInput[UserDefinedEventData]{
273-
DestinationID: destinationID,
274-
Topic: "user-defined-topic",
275-
Message: sendData,
276-
})
267+
err := Send(ctx, destinationID, sendData, "user-defined-topic")
277268
if err != nil {
278269
return "", err
279270
}
@@ -282,10 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,
282273

283274
func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) {
284275
// Receive the user-defined type message
285-
result, err := Recv[UserDefinedEventData](ctx, WorkflowRecvInput{
286-
Topic: "user-defined-topic",
287-
Timeout: 3 * time.Second,
288-
})
276+
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second)
289277
return result, err
290278
}
291279

dbos/system_database.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ type systemDatabase interface {
5050

5151
// Communication (special steps)
5252
send(ctx context.Context, input WorkflowSendInput) error
53-
recv(ctx context.Context, input WorkflowRecvInput) (any, error)
53+
recv(ctx context.Context, input recvInput) (any, error)
5454
setEvent(ctx context.Context, input WorkflowSetEventInput) error
55-
getEvent(ctx context.Context, input WorkflowGetEventInput) (any, error)
55+
getEvent(ctx context.Context, input getEventInput) (any, error)
5656

5757
// Timers (special steps)
5858
sleep(ctx context.Context, duration time.Duration) (time.Duration, error)
@@ -642,7 +642,6 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
642642
wf.Error = errors.New(*errorStr)
643643
}
644644

645-
// XXX maybe set wf.Output to outputString and run deserialize out of system DB
646645
wf.Output, err = deserialize(outputString)
647646
if err != nil {
648647
return nil, fmt.Errorf("failed to deserialize output: %w", err)
@@ -1511,7 +1510,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
15111510
}
15121511

15131512
// Recv is a special type of step that receives a message destined for a given workflow
1514-
func (s *sysDB) recv(ctx context.Context, input WorkflowRecvInput) (any, error) {
1513+
func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
15151514
functionName := "DBOS.recv"
15161515

15171516
// Get workflow state from context
@@ -1734,7 +1733,7 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
17341733
return nil
17351734
}
17361735

1737-
func (s *sysDB) getEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) {
1736+
func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) {
17381737
functionName := "DBOS.getEvent"
17391738

17401739
// Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow)

dbos/workflow.go

Lines changed: 46 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
794794

795795
// If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled
796796
if stopFunc != nil && !stopFunc() {
797+
c.logger.Info("Workflow was cancelled. Waiting for cancel function to complete", "workflow_id", workflowID)
797798
// Wait for the cancel function to complete
798799
// Note this must happen before we write on the outcome channel (and signal the handler's GetResult)
799800
<-cancelFuncCompleted
@@ -817,7 +818,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
817818
close(outcomeChan)
818819
}()
819820

820-
return newWorkflowHandle[any](uncancellableCtx, workflowID, outcomeChan), nil
821+
return newWorkflowHandle(uncancellableCtx, workflowID, outcomeChan), nil
821822
}
822823

823824
/******************************/
@@ -1082,50 +1083,43 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
10821083
/******* WORKFLOW COMMUNICATIONS ********/
10831084
/****************************************/
10841085

1085-
// GenericWorkflowSendInput defines the parameters for sending a message to another workflow.
1086-
type GenericWorkflowSendInput[P any] struct {
1087-
DestinationID string // Workflow ID to send the message to
1088-
Message P // Message payload (must be gob-encodable)
1089-
Topic string // Optional topic for message filtering
1090-
}
1091-
1092-
func (c *dbosContext) Send(_ DBOSContext, input WorkflowSendInput) error {
1093-
return c.systemDB.send(c, input)
1086+
func (c *dbosContext) Send(_ DBOSContext, destinationID string, message any, topic string) error {
1087+
return c.systemDB.send(c, WorkflowSendInput{
1088+
DestinationID: destinationID,
1089+
Message: message,
1090+
Topic: topic,
1091+
})
10941092
}
10951093

10961094
// Send sends a message to another workflow with type safety.
1097-
// The message type R is automatically registered for gob encoding.
1095+
// The message type P is automatically registered for gob encoding.
10981096
//
10991097
// Send can be called from within a workflow (as a durable step) or from outside workflows.
11001098
// When called within a workflow, the send operation becomes part of the workflow's durable state.
11011099
//
11021100
// Example:
11031101
//
1104-
// err := dbos.Send(ctx, dbos.GenericWorkflowSendInput[string]{
1105-
// DestinationID: "target-workflow-id",
1106-
// Message: "Hello from sender",
1107-
// Topic: "notifications",
1108-
// })
1109-
func Send[P any](ctx DBOSContext, input GenericWorkflowSendInput[P]) error {
1102+
// err := dbos.Send(ctx, "target-workflow-id", "Hello from sender", "notifications")
1103+
func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error {
11101104
if ctx == nil {
11111105
return errors.New("ctx cannot be nil")
11121106
}
11131107
var typedMessage P
11141108
gob.Register(typedMessage)
1115-
return ctx.Send(ctx, WorkflowSendInput{
1116-
DestinationID: input.DestinationID,
1117-
Message: input.Message,
1118-
Topic: input.Topic,
1119-
})
1109+
return ctx.Send(ctx, destinationID, message, topic)
11201110
}
11211111

1122-
// WorkflowRecvInput defines the parameters for receiving messages sent to this workflow.
1123-
type WorkflowRecvInput struct {
1112+
// recvInput defines the parameters for receiving messages sent to this workflow.
1113+
type recvInput struct {
11241114
Topic string // Topic to listen for (empty string receives from default topic)
11251115
Timeout time.Duration // Maximum time to wait for a message
11261116
}
11271117

1128-
func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) {
1118+
func (c *dbosContext) Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) {
1119+
input := recvInput{
1120+
Topic: topic,
1121+
Timeout: timeout,
1122+
}
11291123
return c.systemDB.recv(c, input)
11301124
}
11311125

@@ -1138,20 +1132,17 @@ func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error)
11381132
//
11391133
// Example:
11401134
//
1141-
// message, err := dbos.Recv[string](ctx, dbos.WorkflowRecvInput{
1142-
// Topic: "notifications",
1143-
// Timeout: 30 * time.Second,
1144-
// })
1135+
// message, err := dbos.Recv[string](ctx, "notifications", 30 * time.Second)
11451136
// if err != nil {
11461137
// // Handle timeout or error
11471138
// return err
11481139
// }
11491140
// log.Printf("Received: %s", message)
1150-
func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) {
1141+
func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error) {
11511142
if ctx == nil {
11521143
return *new(R), errors.New("ctx cannot be nil")
11531144
}
1154-
msg, err := ctx.Recv(ctx, input)
1145+
msg, err := ctx.Recv(ctx, topic, timeout)
11551146
if err != nil {
11561147
return *new(R), err
11571148
}
@@ -1167,49 +1158,45 @@ func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) {
11671158
return typedMessage, nil
11681159
}
11691160

1170-
// GenericWorkflowSetEventInput defines the parameters for setting a workflow event.
1171-
type GenericWorkflowSetEventInput[P any] struct {
1172-
Key string // Event key identifier
1173-
Message P // Event value (must be gob-encodable)
1174-
}
1175-
1176-
func (c *dbosContext) SetEvent(_ DBOSContext, input WorkflowSetEventInput) error {
1177-
return c.systemDB.setEvent(c, input)
1161+
func (c *dbosContext) SetEvent(_ DBOSContext, key string, message any) error {
1162+
return c.systemDB.setEvent(c, WorkflowSetEventInput{
1163+
Key: key,
1164+
Message: message,
1165+
})
11781166
}
11791167

11801168
// SetEvent sets a key-value event for the current workflow with type safety.
11811169
// Events are persistent and can be retrieved by other workflows using GetEvent.
1182-
// The event type R is automatically registered for gob encoding.
1170+
// The event type P is automatically registered for gob encoding.
11831171
//
11841172
// SetEvent can only be called from within a workflow and becomes part of the workflow's durable state.
11851173
// Setting an event with the same key will overwrite the previous value.
11861174
//
11871175
// Example:
11881176
//
1189-
// err := dbos.SetEvent(ctx, dbos.GenericWorkflowSetEventInput[string]{
1190-
// Key: "status",
1191-
// Message: "processing-complete",
1192-
// })
1193-
func SetEvent[P any](ctx DBOSContext, input GenericWorkflowSetEventInput[P]) error {
1177+
// err := dbos.SetEvent(ctx, "status", "processing-complete")
1178+
func SetEvent[P any](ctx DBOSContext, key string, message P) error {
11941179
if ctx == nil {
11951180
return errors.New("ctx cannot be nil")
11961181
}
11971182
var typedMessage P
11981183
gob.Register(typedMessage)
1199-
return ctx.SetEvent(ctx, WorkflowSetEventInput{
1200-
Key: input.Key,
1201-
Message: input.Message,
1202-
})
1184+
return ctx.SetEvent(ctx, key, message)
12031185
}
12041186

1205-
// WorkflowGetEventInput defines the parameters for retrieving an event from a workflow.
1206-
type WorkflowGetEventInput struct {
1187+
// getEventInput defines the parameters for retrieving an event from a workflow.
1188+
type getEventInput struct {
12071189
TargetWorkflowID string // Workflow ID to get the event from
12081190
Key string // Event key to retrieve
12091191
Timeout time.Duration // Maximum time to wait for the event to be set
12101192
}
12111193

1212-
func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) {
1194+
func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) {
1195+
input := getEventInput{
1196+
TargetWorkflowID: targetWorkflowID,
1197+
Key: key,
1198+
Timeout: timeout,
1199+
}
12131200
return c.systemDB.getEvent(c, input)
12141201
}
12151202

@@ -1221,21 +1208,17 @@ func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any,
12211208
//
12221209
// Example:
12231210
//
1224-
// status, err := dbos.GetEvent[string](ctx, dbos.WorkflowGetEventInput{
1225-
// TargetWorkflowID: "target-workflow-id",
1226-
// Key: "status",
1227-
// Timeout: 30 * time.Second,
1228-
// })
1211+
// status, err := dbos.GetEvent[string](ctx, "target-workflow-id", "status", 30 * time.Second)
12291212
// if err != nil {
12301213
// // Handle timeout or error
12311214
// return err
12321215
// }
12331216
// log.Printf("Status: %s", status)
1234-
func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) {
1217+
func GetEvent[R any](ctx DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (R, error) {
12351218
if ctx == nil {
12361219
return *new(R), errors.New("ctx cannot be nil")
12371220
}
1238-
value, err := ctx.GetEvent(ctx, input)
1221+
value, err := ctx.GetEvent(ctx, targetWorkflowID, key, timeout)
12391222
if err != nil {
12401223
return *new(R), err
12411224
}
@@ -1250,7 +1233,7 @@ func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) {
12501233
return typedValue, nil
12511234
}
12521235

1253-
func (c *dbosContext) Sleep(duration time.Duration) (time.Duration, error) {
1236+
func (c *dbosContext) Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) {
12541237
return c.systemDB.sleep(c, duration)
12551238
}
12561239

@@ -1269,7 +1252,7 @@ func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error) {
12691252
if ctx == nil {
12701253
return 0, errors.New("ctx cannot be nil")
12711254
}
1272-
return ctx.Sleep(duration)
1255+
return ctx.Sleep(ctx, duration)
12731256
}
12741257

12751258
/***********************************/
@@ -1550,7 +1533,7 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo
15501533
// - workflowID: The unique identifier of the workflow to cancel
15511534
//
15521535
// Returns an error if the workflow does not exist or if the cancellation operation fails.
1553-
func (c *dbosContext) CancelWorkflow(workflowID string) error {
1536+
func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error {
15541537
return c.systemDB.cancelWorkflow(c, workflowID)
15551538
}
15561539

@@ -1573,7 +1556,7 @@ func CancelWorkflow(ctx DBOSContext, workflowID string) error {
15731556
if ctx == nil {
15741557
return errors.New("ctx cannot be nil")
15751558
}
1576-
return ctx.CancelWorkflow(workflowID)
1559+
return ctx.CancelWorkflow(ctx, workflowID)
15771560
}
15781561

15791562
func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) {

0 commit comments

Comments
 (0)