Skip to content

Commit 8fffb80

Browse files
bowersj27John Bowers
and
John Bowers
authored
IWF-410: emit RCP_EXECUTION_EVENT (#525)
Co-authored-by: John Bowers <[email protected]>
1 parent 6d54278 commit 8fffb80

File tree

8 files changed

+97
-3
lines changed

8 files changed

+97
-3
lines changed

gen/iwfidl/api/openapi.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -3342,6 +3342,7 @@ components:
33423342
- WORKFLOW_COMPLETE_EVENT
33433343
- WORKFLOW_FAIL_EVENT
33443344
- WORKFLOW_START_EVENT
3345+
- RPC_EXECUTION_EVENT
33453346
type: string
33463347
IwfEvent:
33473348
properties:
@@ -3357,6 +3358,8 @@ components:
33573358
type: string
33583359
stateExecutionId:
33593360
type: string
3361+
rpcName:
3362+
type: string
33603363
startTimestampInMs:
33613364
format: int64
33623365
type: integer

gen/iwfidl/docs/EventType.md

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
* `WORKFLOW_START_EVENT` (value: `"WORKFLOW_START_EVENT"`)
3131

32+
* `RPC_EXECUTION_EVENT` (value: `"RPC_EXECUTION_EVENT"`)
33+
3234

3335
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
3436

gen/iwfidl/docs/IwfEvent.md

+26
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Name | Type | Description | Notes
1010
**WorkflowRunId** | **string** | |
1111
**StateId** | Pointer to **string** | | [optional]
1212
**StateExecutionId** | Pointer to **string** | | [optional]
13+
**RpcName** | Pointer to **string** | | [optional]
1314
**StartTimestampInMs** | Pointer to **int64** | | [optional]
1415
**EndTimestampInMs** | Pointer to **int64** | | [optional]
1516

@@ -162,6 +163,31 @@ SetStateExecutionId sets StateExecutionId field to given value.
162163

163164
HasStateExecutionId returns a boolean if a field has been set.
164165

166+
### GetRpcName
167+
168+
`func (o *IwfEvent) GetRpcName() string`
169+
170+
GetRpcName returns the RpcName field if non-nil, zero value otherwise.
171+
172+
### GetRpcNameOk
173+
174+
`func (o *IwfEvent) GetRpcNameOk() (*string, bool)`
175+
176+
GetRpcNameOk returns a tuple with the RpcName field if it's non-nil, zero value otherwise
177+
and a boolean to check if the value has been set.
178+
179+
### SetRpcName
180+
181+
`func (o *IwfEvent) SetRpcName(v string)`
182+
183+
SetRpcName sets RpcName field to given value.
184+
185+
### HasRpcName
186+
187+
`func (o *IwfEvent) HasRpcName() bool`
188+
189+
HasRpcName returns a boolean if a field has been set.
190+
165191
### GetStartTimestampInMs
166192

167193
`func (o *IwfEvent) GetStartTimestampInMs() int64`

gen/iwfidl/model_event_type.go

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/iwfidl/model_iwf_event.go

+36
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iwf-idl

Submodule iwf-idl updated 1 file

service/api/service.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"github.com/indeedeng/iwf/config"
7+
"github.com/indeedeng/iwf/service/common/event"
78
"github.com/indeedeng/iwf/service/interpreter/env"
89
"net/http"
910
"os"
@@ -612,7 +613,9 @@ func (s *serviceImpl) ApiV1WorkflowSearchPost(
612613
func (s *serviceImpl) ApiV1WorkflowRpcPost(
613614
ctx context.Context, req iwfidl.WorkflowRpcRequest,
614615
) (wresp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) {
615-
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()
616+
defer func() {
617+
log.CapturePanic(recover(), s.logger, &retError)
618+
}()
616619

617620
if needLocking(req) {
618621
return s.handleRpcBySynchronousUpdate(ctx, req)
@@ -628,6 +631,15 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(
628631
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
629632
}
630633

634+
defer func() {
635+
event.Handle(iwfidl.IwfEvent{
636+
EventType: iwfidl.RPC_EXECUTION_EVENT,
637+
RpcName: &req.RpcName,
638+
WorkflowType: rpcPrep.IwfWorkflowType,
639+
WorkflowId: req.GetWorkflowId(),
640+
})
641+
}()
642+
631643
resp, retError := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, s.config.Api.MaxWaitSeconds)
632644
if retError != nil {
633645
return nil, retError

service/interpreter/workflowUpdater.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package interpreter
33
import (
44
"github.com/indeedeng/iwf/gen/iwfidl"
55
"github.com/indeedeng/iwf/service"
6+
"github.com/indeedeng/iwf/service/common/event"
67
"time"
78
)
89

@@ -52,11 +53,22 @@ func NewWorkflowUpdater(
5253
func (u *WorkflowUpdater) handler(
5354
ctx UnifiedContext, input iwfidl.WorkflowRpcRequest,
5455
) (output *HandlerOutput, err error) {
55-
5656
u.continueAsNewer.IncreaseInflightOperation()
5757
defer u.continueAsNewer.DecreaseInflightOperation()
5858

5959
info := u.provider.GetWorkflowInfo(ctx)
60+
61+
defer func() {
62+
if !u.provider.IsReplaying(ctx) {
63+
event.Handle(iwfidl.IwfEvent{
64+
EventType: iwfidl.RPC_EXECUTION_EVENT,
65+
RpcName: &input.RpcName,
66+
WorkflowType: u.basicInfo.IwfWorkflowType,
67+
WorkflowId: info.WorkflowExecution.ID,
68+
})
69+
}
70+
}()
71+
6072
rpcPrep := service.PrepareRpcQueryResponse{
6173
DataObjects: u.persistenceManager.LoadDataObjects(ctx, input.DataAttributesLoadingPolicy),
6274
SearchAttributes: u.persistenceManager.LoadSearchAttributes(ctx, input.SearchAttributesLoadingPolicy),
@@ -88,6 +100,7 @@ func (u *WorkflowUpdater) handler(
88100
handlerOutput := &HandlerOutput{
89101
StatusError: activityOutput.StatusError,
90102
}
103+
91104
rpcOutput := activityOutput.RpcOutput
92105
if rpcOutput != nil {
93106
handlerOutput.RpcOutput = &iwfidl.WorkflowRpcResponse{

0 commit comments

Comments
 (0)