Skip to content

Commit

Permalink
IWF-410: emit RCP_EXECUTION_EVENT
Browse files Browse the repository at this point in the history
  • Loading branch information
John Bowers committed Dec 16, 2024
1 parent 1bed2dc commit 948bac0
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 3 deletions.
3 changes: 3 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3342,6 +3342,7 @@ components:
- WORKFLOW_COMPLETE_EVENT
- WORKFLOW_FAIL_EVENT
- WORKFLOW_START_EVENT
- RPC_EXECUTION_EVENT
type: string
IwfEvent:
properties:
Expand All @@ -3357,6 +3358,8 @@ components:
type: string
stateExecutionId:
type: string
rpcName:
type: string
startTimestampInMs:
format: int64
type: integer
Expand Down
2 changes: 2 additions & 0 deletions gen/iwfidl/docs/EventType.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

* `WORKFLOW_START_EVENT` (value: `"WORKFLOW_START_EVENT"`)

* `RPC_EXECUTION_EVENT` (value: `"RPC_EXECUTION_EVENT"`)


[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
26 changes: 26 additions & 0 deletions gen/iwfidl/docs/IwfEvent.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Name | Type | Description | Notes
**WorkflowRunId** | **string** | |
**StateId** | Pointer to **string** | | [optional]
**StateExecutionId** | Pointer to **string** | | [optional]
**RpcName** | Pointer to **string** | | [optional]
**StartTimestampInMs** | Pointer to **int64** | | [optional]
**EndTimestampInMs** | Pointer to **int64** | | [optional]

Expand Down Expand Up @@ -162,6 +163,31 @@ SetStateExecutionId sets StateExecutionId field to given value.

HasStateExecutionId returns a boolean if a field has been set.

### GetRpcName

`func (o *IwfEvent) GetRpcName() string`

GetRpcName returns the RpcName field if non-nil, zero value otherwise.

### GetRpcNameOk

`func (o *IwfEvent) GetRpcNameOk() (*string, bool)`

GetRpcNameOk returns a tuple with the RpcName field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetRpcName

`func (o *IwfEvent) SetRpcName(v string)`

SetRpcName sets RpcName field to given value.

### HasRpcName

`func (o *IwfEvent) HasRpcName() bool`

HasRpcName returns a boolean if a field has been set.

### GetStartTimestampInMs

`func (o *IwfEvent) GetStartTimestampInMs() int64`
Expand Down
2 changes: 2 additions & 0 deletions gen/iwfidl/model_event_type.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions gen/iwfidl/model_iwf_event.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 1 files
+4 −1 iwf.yaml
14 changes: 13 additions & 1 deletion service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/indeedeng/iwf/config"
"github.com/indeedeng/iwf/service/common/event"
"github.com/indeedeng/iwf/service/interpreter/env"
"net/http"
"os"
Expand Down Expand Up @@ -612,7 +613,9 @@ func (s *serviceImpl) ApiV1WorkflowSearchPost(
func (s *serviceImpl) ApiV1WorkflowRpcPost(
ctx context.Context, req iwfidl.WorkflowRpcRequest,
) (wresp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()
defer func() {
log.CapturePanic(recover(), s.logger, &retError)
}()

if needLocking(req) {
return s.handleRpcBySynchronousUpdate(ctx, req)
Expand All @@ -628,6 +631,15 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
}

defer func() {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &req.RpcName,
WorkflowType: rpcPrep.IwfWorkflowType,
WorkflowId: req.GetWorkflowId(),
})
}()

resp, retError := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, s.config.Api.MaxWaitSeconds)
if retError != nil {
return nil, retError
Expand Down
15 changes: 14 additions & 1 deletion service/interpreter/workflowUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package interpreter
import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/event"
"time"
)

Expand Down Expand Up @@ -52,11 +53,22 @@ func NewWorkflowUpdater(
func (u *WorkflowUpdater) handler(
ctx UnifiedContext, input iwfidl.WorkflowRpcRequest,
) (output *HandlerOutput, err error) {

u.continueAsNewer.IncreaseInflightOperation()
defer u.continueAsNewer.DecreaseInflightOperation()

info := u.provider.GetWorkflowInfo(ctx)

defer func() {
if !u.provider.IsReplaying(ctx) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &input.RpcName,
WorkflowType: u.basicInfo.IwfWorkflowType,
WorkflowId: info.WorkflowExecution.ID,
})
}
}()

rpcPrep := service.PrepareRpcQueryResponse{
DataObjects: u.persistenceManager.LoadDataObjects(ctx, input.DataAttributesLoadingPolicy),
SearchAttributes: u.persistenceManager.LoadSearchAttributes(ctx, input.SearchAttributesLoadingPolicy),
Expand Down Expand Up @@ -88,6 +100,7 @@ func (u *WorkflowUpdater) handler(
handlerOutput := &HandlerOutput{
StatusError: activityOutput.StatusError,
}

rpcOutput := activityOutput.RpcOutput
if rpcOutput != nil {
handlerOutput.RpcOutput = &iwfidl.WorkflowRpcResponse{
Expand Down

0 comments on commit 948bac0

Please sign in to comment.