From cb0d67c625963dcb2c67436a43278bace4cdd9d6 Mon Sep 17 00:00:00 2001 From: lwolczynski <54366429+lwolczynski@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:59:31 -0600 Subject: [PATCH] IWF-105: Add event logger (#480) --- gen/iwfidl/.openapi-generator/FILES | 4 + gen/iwfidl/README.md | 2 + gen/iwfidl/api/openapi.yaml | 42 +++ gen/iwfidl/docs/EventType.md | 35 ++ gen/iwfidl/docs/IwfEvent.md | 218 +++++++++++ gen/iwfidl/model_event_type.go | 132 +++++++ gen/iwfidl/model_iwf_event.go | 340 ++++++++++++++++++ iwf-idl | 2 +- service/common/event/event.go | 16 + service/interpreter/activityImpl.go | 52 +++ .../interpreter/cadence/activityProvider.go | 4 + service/interpreter/interfaces.go | 7 +- .../interpreter/temporal/activityProvider.go | 4 + service/interpreter/workflowImpl.go | 156 ++++++-- 14 files changed, 989 insertions(+), 25 deletions(-) create mode 100644 gen/iwfidl/docs/EventType.md create mode 100644 gen/iwfidl/docs/IwfEvent.md create mode 100644 gen/iwfidl/model_event_type.go create mode 100644 gen/iwfidl/model_iwf_event.go create mode 100644 service/common/event/event.go diff --git a/gen/iwfidl/.openapi-generator/FILES b/gen/iwfidl/.openapi-generator/FILES index 6880a271..b9b3e948 100644 --- a/gen/iwfidl/.openapi-generator/FILES +++ b/gen/iwfidl/.openapi-generator/FILES @@ -17,6 +17,7 @@ docs/DefaultApi.md docs/EncodedObject.md docs/ErrorResponse.md docs/ErrorSubStatus.md +docs/EventType.md docs/ExecuteApiFailurePolicy.md docs/ExecutingStateIdMode.md docs/HealthInfo.md @@ -24,6 +25,7 @@ docs/IDReusePolicy.md docs/InterStateChannelCommand.md docs/InterStateChannelPublishing.md docs/InterStateChannelResult.md +docs/IwfEvent.md docs/KeyValue.md docs/PersistenceLoadingPolicy.md docs/PersistenceLoadingType.md @@ -99,6 +101,7 @@ model_decider_trigger_type.go model_encoded_object.go model_error_response.go model_error_sub_status.go +model_event_type.go model_execute_api_failure_policy.go model_executing_state_id_mode.go model_health_info.go @@ -106,6 +109,7 @@ model_id_reuse_policy.go model_inter_state_channel_command.go model_inter_state_channel_publishing.go model_inter_state_channel_result.go +model_iwf_event.go model_key_value.go model_persistence_loading_policy.go model_persistence_loading_type.go diff --git a/gen/iwfidl/README.md b/gen/iwfidl/README.md index 6b4f2d9b..77482109 100644 --- a/gen/iwfidl/README.md +++ b/gen/iwfidl/README.md @@ -112,6 +112,7 @@ Class | Method | HTTP request | Description - [EncodedObject](docs/EncodedObject.md) - [ErrorResponse](docs/ErrorResponse.md) - [ErrorSubStatus](docs/ErrorSubStatus.md) + - [EventType](docs/EventType.md) - [ExecuteApiFailurePolicy](docs/ExecuteApiFailurePolicy.md) - [ExecutingStateIdMode](docs/ExecutingStateIdMode.md) - [HealthInfo](docs/HealthInfo.md) @@ -119,6 +120,7 @@ Class | Method | HTTP request | Description - [InterStateChannelCommand](docs/InterStateChannelCommand.md) - [InterStateChannelPublishing](docs/InterStateChannelPublishing.md) - [InterStateChannelResult](docs/InterStateChannelResult.md) + - [IwfEvent](docs/IwfEvent.md) - [KeyValue](docs/KeyValue.md) - [PersistenceLoadingPolicy](docs/PersistenceLoadingPolicy.md) - [PersistenceLoadingType](docs/PersistenceLoadingType.md) diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index 2c21c7b6..2c0a0288 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -3303,3 +3303,45 @@ components: - workflowRunId - workflowStartedTimestamp type: object + EventType: + enum: + - STATE_EXECUTE_ATTEMPT_FAIL_EVENT + - STATE_EXECUTE_ATTEMPT_SUCC_EVENT + - STATE_EXECUTE_EE_COMPLETE_EVENT + - STATE_EXECUTE_EE_FAIL_EVENT + - STATE_EXECUTE_EE_START_EVENT + - STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT + - STATE_WAIT_UNTIL_ATTEMPT_SUCC_EVENT + - STATE_WAIT_UNTIL_EE_COMPLETE_EVENT + - STATE_WAIT_UNTIL_EE_FAIL_EVENT + - STATE_WAIT_UNTIL_EE_START_EVENT + - WORKFLOW_COMPLETE_EVENT + - WORKFLOW_FAIL_EVENT + - WORKFLOW_START_EVENT + type: string + IwfEvent: + properties: + eventType: + $ref: '#/components/schemas/EventType' + workflowType: + type: string + workflowId: + type: string + workflowRunId: + type: string + stateId: + type: string + stateExecutionId: + type: string + startTimestampInMs: + format: int64 + type: integer + endTimestampInMs: + format: int64 + type: integer + required: + - eventType + - workflowId + - workflowRunId + - workflowType + type: object diff --git a/gen/iwfidl/docs/EventType.md b/gen/iwfidl/docs/EventType.md new file mode 100644 index 00000000..3da20187 --- /dev/null +++ b/gen/iwfidl/docs/EventType.md @@ -0,0 +1,35 @@ +# EventType + +## Enum + + +* `STATE_EXECUTE_ATTEMPT_FAIL_EVENT` (value: `"STATE_EXECUTE_ATTEMPT_FAIL_EVENT"`) + +* `STATE_EXECUTE_ATTEMPT_SUCC_EVENT` (value: `"STATE_EXECUTE_ATTEMPT_SUCC_EVENT"`) + +* `STATE_EXECUTE_EE_COMPLETE_EVENT` (value: `"STATE_EXECUTE_EE_COMPLETE_EVENT"`) + +* `STATE_EXECUTE_EE_FAIL_EVENT` (value: `"STATE_EXECUTE_EE_FAIL_EVENT"`) + +* `STATE_EXECUTE_EE_START_EVENT` (value: `"STATE_EXECUTE_EE_START_EVENT"`) + +* `STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT` (value: `"STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT"`) + +* `STATE_WAIT_UNTIL_ATTEMPT_SUCC_EVENT` (value: `"STATE_WAIT_UNTIL_ATTEMPT_SUCC_EVENT"`) + +* `STATE_WAIT_UNTIL_EE_COMPLETE_EVENT` (value: `"STATE_WAIT_UNTIL_EE_COMPLETE_EVENT"`) + +* `STATE_WAIT_UNTIL_EE_FAIL_EVENT` (value: `"STATE_WAIT_UNTIL_EE_FAIL_EVENT"`) + +* `STATE_WAIT_UNTIL_EE_START_EVENT` (value: `"STATE_WAIT_UNTIL_EE_START_EVENT"`) + +* `WORKFLOW_COMPLETE_EVENT` (value: `"WORKFLOW_COMPLETE_EVENT"`) + +* `WORKFLOW_FAIL_EVENT` (value: `"WORKFLOW_FAIL_EVENT"`) + +* `WORKFLOW_START_EVENT` (value: `"WORKFLOW_START_EVENT"`) + + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/gen/iwfidl/docs/IwfEvent.md b/gen/iwfidl/docs/IwfEvent.md new file mode 100644 index 00000000..7877ba45 --- /dev/null +++ b/gen/iwfidl/docs/IwfEvent.md @@ -0,0 +1,218 @@ +# IwfEvent + +## Properties + +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**EventType** | [**EventType**](EventType.md) | | +**WorkflowType** | **string** | | +**WorkflowId** | **string** | | +**WorkflowRunId** | **string** | | +**StateId** | Pointer to **string** | | [optional] +**StateExecutionId** | Pointer to **string** | | [optional] +**StartTimestampInMs** | Pointer to **int64** | | [optional] +**EndTimestampInMs** | Pointer to **int64** | | [optional] + +## Methods + +### NewIwfEvent + +`func NewIwfEvent(eventType EventType, workflowType string, workflowId string, workflowRunId string, ) *IwfEvent` + +NewIwfEvent instantiates a new IwfEvent object +This constructor will assign default values to properties that have it defined, +and makes sure properties required by API are set, but the set of arguments +will change when the set of required properties is changed + +### NewIwfEventWithDefaults + +`func NewIwfEventWithDefaults() *IwfEvent` + +NewIwfEventWithDefaults instantiates a new IwfEvent object +This constructor will only assign default values to properties that have it defined, +but it doesn't guarantee that properties required by API are set + +### GetEventType + +`func (o *IwfEvent) GetEventType() EventType` + +GetEventType returns the EventType field if non-nil, zero value otherwise. + +### GetEventTypeOk + +`func (o *IwfEvent) GetEventTypeOk() (*EventType, bool)` + +GetEventTypeOk returns a tuple with the EventType field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetEventType + +`func (o *IwfEvent) SetEventType(v EventType)` + +SetEventType sets EventType field to given value. + + +### GetWorkflowType + +`func (o *IwfEvent) GetWorkflowType() string` + +GetWorkflowType returns the WorkflowType field if non-nil, zero value otherwise. + +### GetWorkflowTypeOk + +`func (o *IwfEvent) GetWorkflowTypeOk() (*string, bool)` + +GetWorkflowTypeOk returns a tuple with the WorkflowType field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetWorkflowType + +`func (o *IwfEvent) SetWorkflowType(v string)` + +SetWorkflowType sets WorkflowType field to given value. + + +### GetWorkflowId + +`func (o *IwfEvent) GetWorkflowId() string` + +GetWorkflowId returns the WorkflowId field if non-nil, zero value otherwise. + +### GetWorkflowIdOk + +`func (o *IwfEvent) GetWorkflowIdOk() (*string, bool)` + +GetWorkflowIdOk returns a tuple with the WorkflowId field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetWorkflowId + +`func (o *IwfEvent) SetWorkflowId(v string)` + +SetWorkflowId sets WorkflowId field to given value. + + +### GetWorkflowRunId + +`func (o *IwfEvent) GetWorkflowRunId() string` + +GetWorkflowRunId returns the WorkflowRunId field if non-nil, zero value otherwise. + +### GetWorkflowRunIdOk + +`func (o *IwfEvent) GetWorkflowRunIdOk() (*string, bool)` + +GetWorkflowRunIdOk returns a tuple with the WorkflowRunId field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetWorkflowRunId + +`func (o *IwfEvent) SetWorkflowRunId(v string)` + +SetWorkflowRunId sets WorkflowRunId field to given value. + + +### GetStateId + +`func (o *IwfEvent) GetStateId() string` + +GetStateId returns the StateId field if non-nil, zero value otherwise. + +### GetStateIdOk + +`func (o *IwfEvent) GetStateIdOk() (*string, bool)` + +GetStateIdOk returns a tuple with the StateId field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetStateId + +`func (o *IwfEvent) SetStateId(v string)` + +SetStateId sets StateId field to given value. + +### HasStateId + +`func (o *IwfEvent) HasStateId() bool` + +HasStateId returns a boolean if a field has been set. + +### GetStateExecutionId + +`func (o *IwfEvent) GetStateExecutionId() string` + +GetStateExecutionId returns the StateExecutionId field if non-nil, zero value otherwise. + +### GetStateExecutionIdOk + +`func (o *IwfEvent) GetStateExecutionIdOk() (*string, bool)` + +GetStateExecutionIdOk returns a tuple with the StateExecutionId field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetStateExecutionId + +`func (o *IwfEvent) SetStateExecutionId(v string)` + +SetStateExecutionId sets StateExecutionId field to given value. + +### HasStateExecutionId + +`func (o *IwfEvent) HasStateExecutionId() bool` + +HasStateExecutionId returns a boolean if a field has been set. + +### GetStartTimestampInMs + +`func (o *IwfEvent) GetStartTimestampInMs() int64` + +GetStartTimestampInMs returns the StartTimestampInMs field if non-nil, zero value otherwise. + +### GetStartTimestampInMsOk + +`func (o *IwfEvent) GetStartTimestampInMsOk() (*int64, bool)` + +GetStartTimestampInMsOk returns a tuple with the StartTimestampInMs field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetStartTimestampInMs + +`func (o *IwfEvent) SetStartTimestampInMs(v int64)` + +SetStartTimestampInMs sets StartTimestampInMs field to given value. + +### HasStartTimestampInMs + +`func (o *IwfEvent) HasStartTimestampInMs() bool` + +HasStartTimestampInMs returns a boolean if a field has been set. + +### GetEndTimestampInMs + +`func (o *IwfEvent) GetEndTimestampInMs() int64` + +GetEndTimestampInMs returns the EndTimestampInMs field if non-nil, zero value otherwise. + +### GetEndTimestampInMsOk + +`func (o *IwfEvent) GetEndTimestampInMsOk() (*int64, bool)` + +GetEndTimestampInMsOk returns a tuple with the EndTimestampInMs field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetEndTimestampInMs + +`func (o *IwfEvent) SetEndTimestampInMs(v int64)` + +SetEndTimestampInMs sets EndTimestampInMs field to given value. + +### HasEndTimestampInMs + +`func (o *IwfEvent) HasEndTimestampInMs() bool` + +HasEndTimestampInMs returns a boolean if a field has been set. + + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/gen/iwfidl/model_event_type.go b/gen/iwfidl/model_event_type.go new file mode 100644 index 00000000..35684a6f --- /dev/null +++ b/gen/iwfidl/model_event_type.go @@ -0,0 +1,132 @@ +/* +Workflow APIs + +This APIs for iwf SDKs to operate workflows + +API version: 1.0.0 +*/ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package iwfidl + +import ( + "encoding/json" + "fmt" +) + +// EventType the model 'EventType' +type EventType string + +// List of EventType +const ( + STATE_EXECUTE_ATTEMPT_FAIL_EVENT EventType = "STATE_EXECUTE_ATTEMPT_FAIL_EVENT" + STATE_EXECUTE_ATTEMPT_SUCC_EVENT EventType = "STATE_EXECUTE_ATTEMPT_SUCC_EVENT" + STATE_EXECUTE_EE_COMPLETE_EVENT EventType = "STATE_EXECUTE_EE_COMPLETE_EVENT" + STATE_EXECUTE_EE_FAIL_EVENT EventType = "STATE_EXECUTE_EE_FAIL_EVENT" + STATE_EXECUTE_EE_START_EVENT EventType = "STATE_EXECUTE_EE_START_EVENT" + STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT EventType = "STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT" + STATE_WAIT_UNTIL_ATTEMPT_SUCC_EVENT EventType = "STATE_WAIT_UNTIL_ATTEMPT_SUCC_EVENT" + STATE_WAIT_UNTIL_EE_COMPLETE_EVENT EventType = "STATE_WAIT_UNTIL_EE_COMPLETE_EVENT" + STATE_WAIT_UNTIL_EE_FAIL_EVENT EventType = "STATE_WAIT_UNTIL_EE_FAIL_EVENT" + STATE_WAIT_UNTIL_EE_START_EVENT EventType = "STATE_WAIT_UNTIL_EE_START_EVENT" + WORKFLOW_COMPLETE_EVENT EventType = "WORKFLOW_COMPLETE_EVENT" + WORKFLOW_FAIL_EVENT EventType = "WORKFLOW_FAIL_EVENT" + WORKFLOW_START_EVENT EventType = "WORKFLOW_START_EVENT" +) + +// All allowed values of EventType enum +var AllowedEventTypeEnumValues = []EventType{ + "STATE_EXECUTE_ATTEMPT_FAIL_EVENT", + "STATE_EXECUTE_ATTEMPT_SUCC_EVENT", + "STATE_EXECUTE_EE_COMPLETE_EVENT", + "STATE_EXECUTE_EE_FAIL_EVENT", + "STATE_EXECUTE_EE_START_EVENT", + "STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT", + "STATE_WAIT_UNTIL_ATTEMPT_SUCC_EVENT", + "STATE_WAIT_UNTIL_EE_COMPLETE_EVENT", + "STATE_WAIT_UNTIL_EE_FAIL_EVENT", + "STATE_WAIT_UNTIL_EE_START_EVENT", + "WORKFLOW_COMPLETE_EVENT", + "WORKFLOW_FAIL_EVENT", + "WORKFLOW_START_EVENT", +} + +func (v *EventType) UnmarshalJSON(src []byte) error { + var value string + err := json.Unmarshal(src, &value) + if err != nil { + return err + } + enumTypeValue := EventType(value) + for _, existing := range AllowedEventTypeEnumValues { + if existing == enumTypeValue { + *v = enumTypeValue + return nil + } + } + + return fmt.Errorf("%+v is not a valid EventType", value) +} + +// NewEventTypeFromValue returns a pointer to a valid EventType +// for the value passed as argument, or an error if the value passed is not allowed by the enum +func NewEventTypeFromValue(v string) (*EventType, error) { + ev := EventType(v) + if ev.IsValid() { + return &ev, nil + } else { + return nil, fmt.Errorf("invalid value '%v' for EventType: valid values are %v", v, AllowedEventTypeEnumValues) + } +} + +// IsValid return true if the value is valid for the enum, false otherwise +func (v EventType) IsValid() bool { + for _, existing := range AllowedEventTypeEnumValues { + if existing == v { + return true + } + } + return false +} + +// Ptr returns reference to EventType value +func (v EventType) Ptr() *EventType { + return &v +} + +type NullableEventType struct { + value *EventType + isSet bool +} + +func (v NullableEventType) Get() *EventType { + return v.value +} + +func (v *NullableEventType) Set(val *EventType) { + v.value = val + v.isSet = true +} + +func (v NullableEventType) IsSet() bool { + return v.isSet +} + +func (v *NullableEventType) Unset() { + v.value = nil + v.isSet = false +} + +func NewNullableEventType(val *EventType) *NullableEventType { + return &NullableEventType{value: val, isSet: true} +} + +func (v NullableEventType) MarshalJSON() ([]byte, error) { + return json.Marshal(v.value) +} + +func (v *NullableEventType) UnmarshalJSON(src []byte) error { + v.isSet = true + return json.Unmarshal(src, &v.value) +} diff --git a/gen/iwfidl/model_iwf_event.go b/gen/iwfidl/model_iwf_event.go new file mode 100644 index 00000000..ceac2424 --- /dev/null +++ b/gen/iwfidl/model_iwf_event.go @@ -0,0 +1,340 @@ +/* +Workflow APIs + +This APIs for iwf SDKs to operate workflows + +API version: 1.0.0 +*/ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package iwfidl + +import ( + "encoding/json" +) + +// checks if the IwfEvent type satisfies the MappedNullable interface at compile time +var _ MappedNullable = &IwfEvent{} + +// IwfEvent struct for IwfEvent +type IwfEvent struct { + EventType EventType `json:"eventType"` + WorkflowType string `json:"workflowType"` + WorkflowId string `json:"workflowId"` + WorkflowRunId string `json:"workflowRunId"` + StateId *string `json:"stateId,omitempty"` + StateExecutionId *string `json:"stateExecutionId,omitempty"` + StartTimestampInMs *int64 `json:"startTimestampInMs,omitempty"` + EndTimestampInMs *int64 `json:"endTimestampInMs,omitempty"` +} + +// NewIwfEvent instantiates a new IwfEvent object +// This constructor will assign default values to properties that have it defined, +// and makes sure properties required by API are set, but the set of arguments +// will change when the set of required properties is changed +func NewIwfEvent(eventType EventType, workflowType string, workflowId string, workflowRunId string) *IwfEvent { + this := IwfEvent{} + this.EventType = eventType + this.WorkflowType = workflowType + this.WorkflowId = workflowId + this.WorkflowRunId = workflowRunId + return &this +} + +// NewIwfEventWithDefaults instantiates a new IwfEvent object +// This constructor will only assign default values to properties that have it defined, +// but it doesn't guarantee that properties required by API are set +func NewIwfEventWithDefaults() *IwfEvent { + this := IwfEvent{} + return &this +} + +// GetEventType returns the EventType field value +func (o *IwfEvent) GetEventType() EventType { + if o == nil { + var ret EventType + return ret + } + + return o.EventType +} + +// GetEventTypeOk returns a tuple with the EventType field value +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetEventTypeOk() (*EventType, bool) { + if o == nil { + return nil, false + } + return &o.EventType, true +} + +// SetEventType sets field value +func (o *IwfEvent) SetEventType(v EventType) { + o.EventType = v +} + +// GetWorkflowType returns the WorkflowType field value +func (o *IwfEvent) GetWorkflowType() string { + if o == nil { + var ret string + return ret + } + + return o.WorkflowType +} + +// GetWorkflowTypeOk returns a tuple with the WorkflowType field value +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetWorkflowTypeOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.WorkflowType, true +} + +// SetWorkflowType sets field value +func (o *IwfEvent) SetWorkflowType(v string) { + o.WorkflowType = v +} + +// GetWorkflowId returns the WorkflowId field value +func (o *IwfEvent) GetWorkflowId() string { + if o == nil { + var ret string + return ret + } + + return o.WorkflowId +} + +// GetWorkflowIdOk returns a tuple with the WorkflowId field value +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetWorkflowIdOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.WorkflowId, true +} + +// SetWorkflowId sets field value +func (o *IwfEvent) SetWorkflowId(v string) { + o.WorkflowId = v +} + +// GetWorkflowRunId returns the WorkflowRunId field value +func (o *IwfEvent) GetWorkflowRunId() string { + if o == nil { + var ret string + return ret + } + + return o.WorkflowRunId +} + +// GetWorkflowRunIdOk returns a tuple with the WorkflowRunId field value +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetWorkflowRunIdOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.WorkflowRunId, true +} + +// SetWorkflowRunId sets field value +func (o *IwfEvent) SetWorkflowRunId(v string) { + o.WorkflowRunId = v +} + +// GetStateId returns the StateId field value if set, zero value otherwise. +func (o *IwfEvent) GetStateId() string { + if o == nil || IsNil(o.StateId) { + var ret string + return ret + } + return *o.StateId +} + +// GetStateIdOk returns a tuple with the StateId field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetStateIdOk() (*string, bool) { + if o == nil || IsNil(o.StateId) { + return nil, false + } + return o.StateId, true +} + +// HasStateId returns a boolean if a field has been set. +func (o *IwfEvent) HasStateId() bool { + if o != nil && !IsNil(o.StateId) { + return true + } + + return false +} + +// SetStateId gets a reference to the given string and assigns it to the StateId field. +func (o *IwfEvent) SetStateId(v string) { + o.StateId = &v +} + +// GetStateExecutionId returns the StateExecutionId field value if set, zero value otherwise. +func (o *IwfEvent) GetStateExecutionId() string { + if o == nil || IsNil(o.StateExecutionId) { + var ret string + return ret + } + return *o.StateExecutionId +} + +// GetStateExecutionIdOk returns a tuple with the StateExecutionId field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetStateExecutionIdOk() (*string, bool) { + if o == nil || IsNil(o.StateExecutionId) { + return nil, false + } + return o.StateExecutionId, true +} + +// HasStateExecutionId returns a boolean if a field has been set. +func (o *IwfEvent) HasStateExecutionId() bool { + if o != nil && !IsNil(o.StateExecutionId) { + return true + } + + return false +} + +// SetStateExecutionId gets a reference to the given string and assigns it to the StateExecutionId field. +func (o *IwfEvent) SetStateExecutionId(v string) { + o.StateExecutionId = &v +} + +// GetStartTimestampInMs returns the StartTimestampInMs field value if set, zero value otherwise. +func (o *IwfEvent) GetStartTimestampInMs() int64 { + if o == nil || IsNil(o.StartTimestampInMs) { + var ret int64 + return ret + } + return *o.StartTimestampInMs +} + +// GetStartTimestampInMsOk returns a tuple with the StartTimestampInMs field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetStartTimestampInMsOk() (*int64, bool) { + if o == nil || IsNil(o.StartTimestampInMs) { + return nil, false + } + return o.StartTimestampInMs, true +} + +// HasStartTimestampInMs returns a boolean if a field has been set. +func (o *IwfEvent) HasStartTimestampInMs() bool { + if o != nil && !IsNil(o.StartTimestampInMs) { + return true + } + + return false +} + +// SetStartTimestampInMs gets a reference to the given int64 and assigns it to the StartTimestampInMs field. +func (o *IwfEvent) SetStartTimestampInMs(v int64) { + o.StartTimestampInMs = &v +} + +// GetEndTimestampInMs returns the EndTimestampInMs field value if set, zero value otherwise. +func (o *IwfEvent) GetEndTimestampInMs() int64 { + if o == nil || IsNil(o.EndTimestampInMs) { + var ret int64 + return ret + } + return *o.EndTimestampInMs +} + +// GetEndTimestampInMsOk returns a tuple with the EndTimestampInMs field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetEndTimestampInMsOk() (*int64, bool) { + if o == nil || IsNil(o.EndTimestampInMs) { + return nil, false + } + return o.EndTimestampInMs, true +} + +// HasEndTimestampInMs returns a boolean if a field has been set. +func (o *IwfEvent) HasEndTimestampInMs() bool { + if o != nil && !IsNil(o.EndTimestampInMs) { + return true + } + + return false +} + +// SetEndTimestampInMs gets a reference to the given int64 and assigns it to the EndTimestampInMs field. +func (o *IwfEvent) SetEndTimestampInMs(v int64) { + o.EndTimestampInMs = &v +} + +func (o IwfEvent) MarshalJSON() ([]byte, error) { + toSerialize, err := o.ToMap() + if err != nil { + return []byte{}, err + } + return json.Marshal(toSerialize) +} + +func (o IwfEvent) ToMap() (map[string]interface{}, error) { + toSerialize := map[string]interface{}{} + toSerialize["eventType"] = o.EventType + toSerialize["workflowType"] = o.WorkflowType + toSerialize["workflowId"] = o.WorkflowId + toSerialize["workflowRunId"] = o.WorkflowRunId + if !IsNil(o.StateId) { + toSerialize["stateId"] = o.StateId + } + if !IsNil(o.StateExecutionId) { + toSerialize["stateExecutionId"] = o.StateExecutionId + } + if !IsNil(o.StartTimestampInMs) { + toSerialize["startTimestampInMs"] = o.StartTimestampInMs + } + if !IsNil(o.EndTimestampInMs) { + toSerialize["endTimestampInMs"] = o.EndTimestampInMs + } + return toSerialize, nil +} + +type NullableIwfEvent struct { + value *IwfEvent + isSet bool +} + +func (v NullableIwfEvent) Get() *IwfEvent { + return v.value +} + +func (v *NullableIwfEvent) Set(val *IwfEvent) { + v.value = val + v.isSet = true +} + +func (v NullableIwfEvent) IsSet() bool { + return v.isSet +} + +func (v *NullableIwfEvent) Unset() { + v.value = nil + v.isSet = false +} + +func NewNullableIwfEvent(val *IwfEvent) *NullableIwfEvent { + return &NullableIwfEvent{value: val, isSet: true} +} + +func (v NullableIwfEvent) MarshalJSON() ([]byte, error) { + return json.Marshal(v.value) +} + +func (v *NullableIwfEvent) UnmarshalJSON(src []byte) error { + v.isSet = true + return json.Unmarshal(src, &v.value) +} diff --git a/iwf-idl b/iwf-idl index 343c4032..75978e48 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit 343c403214e66fb9878a1ea873d954731129b0d6 +Subproject commit 75978e48d6a929a2bb761e5c8f67d6caac99d658 diff --git a/service/common/event/event.go b/service/common/event/event.go new file mode 100644 index 00000000..0a461813 --- /dev/null +++ b/service/common/event/event.go @@ -0,0 +1,16 @@ +package event + +import "github.com/indeedeng/iwf/gen/iwfidl" + +// The implementation must be lightweight, reliable and fast (less than 1s) +type HandleEventFunc func(event iwfidl.IwfEvent) + +var Handle HandleEventFunc = DefaultHandleEventFunc + +func SetHandleEventFunc(handler HandleEventFunc) { + Handle = handler +} + +func DefaultHandleEventFunc(event iwfidl.IwfEvent) { + // Noop by default +} diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index 60dc3866..a1ea6ed9 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -6,6 +6,8 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" + "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/rpc" "github.com/indeedeng/iwf/service/common/urlautofix" "github.com/indeedeng/iwf/service/interpreter/env" @@ -13,6 +15,7 @@ import ( "net/http" "os" "slices" + "time" ) // StateStart is Deprecated, will be removed in next release @@ -25,6 +28,7 @@ func StateStart( func StateApiWaitUntil( ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, ) (*iwfidl.WorkflowStateStartResponse, error) { + stateApiWaitUntilStartTime := time.Now().UnixMilli() provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateStartActivity", "input", input) @@ -50,12 +54,24 @@ func StateApiWaitUntil( resp, httpResp, err := req.WorkflowStateStartRequest(input.Request).Execute() printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + }) return nil, composeHttpError( activityInfo.IsLocalActivity, provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) } if err := checkCommandRequestFromWaitUntilResponse(resp); err != nil { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + }) return nil, composeStartApiRespError(provider, err, resp) } @@ -65,6 +81,15 @@ func StateApiWaitUntil( if activityInfo.IsLocalActivity { resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId()) } + + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_SUCC_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime), + EndTimestampInMs: ptr.Any(time.Now().UnixMilli()), + }) return resp, nil } @@ -82,6 +107,7 @@ func StateApiExecute( backendType service.BackendType, input service.StateDecideActivityInput, ) (*iwfidl.WorkflowStateDecideResponse, error) { + stateApiExecuteStartTime := time.Now().UnixMilli() provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateDecideActivity", "input", input) @@ -107,12 +133,28 @@ func StateApiExecute( resp, httpResp, err := req.WorkflowStateDecideRequest(input.Request).Execute() printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StateId: ptr.Any(input.Request.WorkflowStateId), + StateExecutionId: input.Request.Context.StateExecutionId, + }) return nil, composeHttpError( activityInfo.IsLocalActivity, provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) } if err = checkStateDecisionFromResponse(resp); err != nil { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StateId: ptr.Any(input.Request.WorkflowStateId), + StateExecutionId: input.Request.Context.StateExecutionId, + }) return nil, composeExecuteApiRespError(provider, err, resp) } @@ -123,6 +165,16 @@ func StateApiExecute( resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId()) } + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_EXECUTE_ATTEMPT_SUCC_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StateId: ptr.Any(input.Request.WorkflowStateId), + StateExecutionId: input.Request.Context.StateExecutionId, + StartTimestampInMs: ptr.Any(stateApiExecuteStartTime), + EndTimestampInMs: ptr.Any(time.Now().UnixMilli()), + }) return resp, nil } diff --git a/service/interpreter/cadence/activityProvider.go b/service/interpreter/cadence/activityProvider.go index 0e16ee85..b8a4496c 100644 --- a/service/interpreter/cadence/activityProvider.go +++ b/service/interpreter/cadence/activityProvider.go @@ -31,5 +31,9 @@ func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.Acti ScheduledTime: info.ScheduledTimestamp, Attempt: info.Attempt + 1, // NOTE increase by one to match Temporal IsLocalActivity: false, // TODO cadence doesn't support this yet + WorkflowExecution: interpreter.WorkflowExecution{ + ID: info.WorkflowExecution.ID, + RunID: info.WorkflowExecution.RunID, + }, } } diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces.go index 2b68db80..beb871b2 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces.go @@ -16,9 +16,10 @@ type ActivityProvider interface { } type ActivityInfo struct { - ScheduledTime time.Time // Time of activity scheduled by a workflow - Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified. - IsLocalActivity bool // Whether the activity is at local activity + ScheduledTime time.Time // Time of activity scheduled by a workflow + Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified. + IsLocalActivity bool // Whether the activity is at local activity + WorkflowExecution WorkflowExecution } var activityProviderRegistry = make(map[service.BackendType]ActivityProvider) diff --git a/service/interpreter/temporal/activityProvider.go b/service/interpreter/temporal/activityProvider.go index 49152c41..d8169791 100644 --- a/service/interpreter/temporal/activityProvider.go +++ b/service/interpreter/temporal/activityProvider.go @@ -28,5 +28,9 @@ func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.Acti ScheduledTime: info.ScheduledTime, Attempt: info.Attempt, IsLocalActivity: info.IsLocalActivity, + WorkflowExecution: interpreter.WorkflowExecution{ + ID: info.WorkflowExecution.ID, + RunID: info.WorkflowExecution.RunID, + }, } } diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index eaf9bdce..189c57ec 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -4,6 +4,8 @@ import ( "context" "fmt" uclient "github.com/indeedeng/iwf/service/client" + "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/utils" "github.com/indeedeng/iwf/service/interpreter/env" "time" @@ -17,17 +19,43 @@ import ( func InterpreterImpl( ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput, -) (*service.InterpreterWorkflowOutput, error) { +) (output *service.InterpreterWorkflowOutput, retErr error) { + defer func() { + if !provider.IsReplaying(ctx) { + // send metrics for the workflow result + if retErr == nil { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.WORKFLOW_COMPLETE_EVENT, + WorkflowType: input.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()), + EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()), + }) + } else if provider.IsApplicationError(retErr) { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.WORKFLOW_FAIL_EVENT, + WorkflowType: input.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + }) + } + } + }() + var err error + globalVersioner, err := NewGlobalVersioner(provider, input.OmitVersionMarker != nil && *input.OmitVersionMarker, ctx) if err != nil { - return nil, err + retErr = err + return } if globalVersioner.IsAfterVersionOfUsingGlobalVersioning() { err = globalVersioner.UpsertGlobalVersionSearchAttribute() if err != nil { - return nil, err + retErr = err + return } } @@ -38,7 +66,8 @@ func InterpreterImpl( service.SearchAttributeIwfWorkflowType: input.IwfWorkflowType, }) if err != nil { - return nil, err + retErr = err + return } } } @@ -63,7 +92,8 @@ func InterpreterImpl( config := workflowConfiger.Get() previous, err := LoadInternalsFromPreviousRun(ctx, provider, canInput.PreviousInternalRunId, config.GetContinueAsNewPageSizeInBytes()) if err != nil { - return nil, err + retErr = err + return } // The below initialization order should be the same as for non-continueAsNew @@ -94,7 +124,8 @@ func InterpreterImpl( _, err = NewWorkflowUpdater(ctx, provider, persistenceManager, stateRequestQueue, continueAsNewer, continueAsNewCounter, workflowConfiger, interStateChannel, basicInfo, globalVersioner) if err != nil { - return nil, err + retErr = err + return } // We intentionally set the query handler after the continueAsNew/dumpInternal activity. // This is to ensure the correctness. If we set the query handler before that, @@ -102,7 +133,8 @@ func InterpreterImpl( // We would rather return server errors and let the client retry later. err = SetQueryHandlers(ctx, provider, persistenceManager, continueAsNewer, workflowConfiger, basicInfo) if err != nil { - return nil, err + retErr = err + return } var errToFailWf error // Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve. @@ -114,6 +146,14 @@ func InterpreterImpl( defer stateExecutionCounter.ClearExecutingStateIdsSearchAttributeFinally() if !input.IsResumeFromContinueAsNew { + if !provider.IsReplaying(ctx) { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.WORKFLOW_START_EVENT, + WorkflowType: input.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + }) + } // it's possible that a workflow is started without any starting state // it will wait for a new state coming in (by RPC results) if input.StartStateId != nil { @@ -137,11 +177,13 @@ func InterpreterImpl( return !stateRequestQueue.IsEmpty() || failWorkflowByClient || shouldGracefulComplete }) if err != nil { - return nil, err + retErr = err + return } failWorkflowByClient, failErr := signalReceiver.IsFailWorkflowRequested() if failWorkflowByClient { - return nil, failErr + retErr = failErr + return } if shouldGracefulComplete && stateRequestQueue.IsEmpty() { break @@ -154,7 +196,8 @@ func InterpreterImpl( statesToExecute = stateRequestQueue.TakeAll() err = stateExecutionCounter.MarkStateIdExecutingIfNotYet(statesToExecute) if err != nil { - return nil, err + retErr = err + return } } @@ -187,7 +230,7 @@ func InterpreterImpl( slices.Contains(input.WaitForCompletionStateExecutionIds, stateExeId) || slices.Contains(input.WaitForCompletionStateIds, state.GetStateId()) - decision, stateExecStatus, err := executeState( + decision, stateExecStatus, err := processStateExecution( ctx, provider, globalVersioner, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel, signalReceiver, timerProcessor, continueAsNewer, continueAsNewCounter, workflowConfiger, shouldSendSignalOnCompletion) if err != nil { @@ -269,9 +312,11 @@ func InterpreterImpl( } if errToFailWf != nil || forceCompleteWf { - return &service.InterpreterWorkflowOutput{ + output = &service.InterpreterWorkflowOutput{ StateCompletionOutputs: outputCollector.GetAll(), - }, errToFailWf + } + retErr = errToFailWf + return } if awaitError != nil { @@ -292,7 +337,6 @@ func InterpreterImpl( errToFailWf = err break } - // NOTE: This must be the last thing before continueAsNew!!! // Otherwise, there could be signals unhandled signalReceiver.DrainAllUnreceivedSignals(ctx) @@ -301,9 +345,11 @@ func InterpreterImpl( // last fail workflow signal, return the workflow so that we don't carry over the fail request failByApi, failErr := signalReceiver.IsFailWorkflowRequested() if failByApi { - return &service.InterpreterWorkflowOutput{ + output = &service.InterpreterWorkflowOutput{ StateCompletionOutputs: outputCollector.GetAll(), - }, failErr + } + retErr = failErr + return } if stateRequestQueue.IsEmpty() && !continueAsNewer.HasAnyStateExecutionToResume() && shouldGracefulComplete { // if it is empty and no stateExecutionsToResume and request a graceful complete just complete the loop @@ -311,7 +357,7 @@ func InterpreterImpl( break } // last update config, do it here because we use input to carry over config, not continueAsNewer query - input.Config = workflowConfiger.Get() // update config to the lastest before continueAsNew to carry over + input.Config = workflowConfiger.Get() // update config to the latest before continueAsNew to carry over input.IsResumeFromContinueAsNew = true input.ContinueAsNewInput = &service.ContinueAsNewInput{ PreviousInternalRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, @@ -322,14 +368,17 @@ func InterpreterImpl( input.StartStateId = nil input.InitDataAttributes = nil input.InitSearchAttributes = nil - return nil, provider.NewInterpreterContinueAsNewError(ctx, input) + retErr = provider.NewInterpreterContinueAsNewError(ctx, input) + return } } // end main loop // gracefully complete workflow when all states are executed to dead ends - return &service.InterpreterWorkflowOutput{ + output = &service.InterpreterWorkflowOutput{ StateCompletionOutputs: outputCollector.GetAll(), - }, errToFailWf + } + retErr = errToFailWf + return } func checkClosingWorkflow( @@ -438,7 +487,7 @@ func checkClosingWorkflow( return } -func executeState( +func processStateExecution( ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, @@ -511,6 +560,15 @@ func executeState( saLoadingPolicy := compatibility.GetWaitUntilApiSearchAttributesLoadingPolicy(state.StateOptions) doLoadingPolicy := compatibility.GetWaitUntilApiDataObjectsLoadingPolicy(state.StateOptions) + if !provider.IsReplaying(ctx) { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_WAIT_UNTIL_EE_START_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + }) + } + stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli() errStartApi = provider.ExecuteActivity(&startResponse, configer.ShouldOptimizeActivity(), ctx, waitUntilApi, provider.GetBackendType(), service.StateStartActivityInput{ IwfWorkerUrl: basicInfo.IwfWorkerUrl, @@ -523,6 +581,26 @@ func executeState( DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy), }, }) + if !provider.IsReplaying(ctx) { + if errStartApi == nil { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_WAIT_UNTIL_EE_FAIL_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + }) + } else { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_WAIT_UNTIL_EE_COMPLETE_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime), + EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()), + }) + } + } + persistenceManager.UnlockPersistence(saLoadingPolicy, doLoadingPolicy) if errStartApi != nil && !shouldProceedOnStartApiError(state) { return nil, service.FailureStateExecutionStatus, convertStateApiActivityError(provider, errStartApi) @@ -744,6 +822,18 @@ func invokeStateExecute( ctx = provider.WithActivityOptions(ctx, activityOptions) var decideResponse *iwfidl.WorkflowStateDecideResponse + + if !provider.IsReplaying(ctx) { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_EXECUTE_EE_START_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StateId: ptr.Any(state.StateId), + StateExecutionId: ptr.Any(stateExeId), + }) + } + stateExecuteApiStartTime := provider.Now(ctx).UnixMilli() err = provider.ExecuteActivity(&decideResponse, configer.ShouldOptimizeActivity(), ctx, executeApi, provider.GetBackendType(), service.StateDecideActivityInput{ IwfWorkerUrl: basicInfo.IwfWorkerUrl, @@ -758,6 +848,30 @@ func invokeStateExecute( StateInput: state.StateInput, }, }) + if !provider.IsReplaying(ctx) { + if err == nil { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_EXECUTE_EE_COMPLETE_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StateId: ptr.Any(state.StateId), + StateExecutionId: ptr.Any(stateExeId), + StartTimestampInMs: ptr.Any(stateExecuteApiStartTime), + EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()), + }) + } else { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.STATE_EXECUTE_EE_FAIL_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StateId: ptr.Any(state.StateId), + StateExecutionId: ptr.Any(stateExeId), + }) + } + } + persistenceManager.UnlockPersistence(saLoadingPolicy, doLoadingPolicy) if err == nil && shouldSendSignalOnCompletion && !provider.IsReplaying(ctx) { // NOTE: here uses NOT IsReplaying to signalWithStart, to save an activity for this operation