Skip to content

Commit

Permalink
Send original update request back in response (temporalio#1480)
Browse files Browse the repository at this point in the history
Server will use this value to reconstruct the update if it is lost in certain cases.
  • Loading branch information
Quinn-With-Two-Ns authored May 21, 2024
1 parent 222d4cf commit fb06909
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
15 changes: 10 additions & 5 deletions internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type (
updateProtocol struct {
protoInstanceID string
clientIdentity string
initialRequest *updatepb.Request
requestMsgID string
requestSeqID int64
scheduleUpdate func(name string, id string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks)
Expand Down Expand Up @@ -134,15 +135,16 @@ func (up *updateProtocol) requireState(action string, valid ...updateState) {
}

func (up *updateProtocol) HandleMessage(msg *protocolpb.Message) error {
var req updatepb.Request
if err := msg.Body.UnmarshalTo(&req); err != nil {
var request updatepb.Request
if err := msg.Body.UnmarshalTo(&request); err != nil {
return err
}
up.initialRequest = &request
up.requireState("update request", updateStateNew)
up.requestMsgID = msg.GetId()
up.requestSeqID = msg.GetEventId()
input := req.GetInput()
up.scheduleUpdate(input.GetName(), req.GetMeta().GetUpdateId(), input.GetArgs(), input.GetHeader(), up)
input := up.initialRequest.GetInput()
up.scheduleUpdate(input.GetName(), up.initialRequest.GetMeta().GetUpdateId(), input.GetArgs(), input.GetHeader(), up)
up.state = updateStateRequestInitiated
return nil
}
Expand All @@ -157,8 +159,11 @@ func (up *updateProtocol) Accept() {
Body: protocol.MustMarshalAny(&updatepb.Acceptance{
AcceptedRequestMessageId: up.requestMsgID,
AcceptedRequestSequencingEventId: up.requestSeqID,
AcceptedRequest: up.initialRequest,
}),
}, withExpectedEventPredicate(up.checkAcceptedEvent))
// Stop holding a reference to the initial request to allow it to be GCed
up.initialRequest = nil
up.state = updateStateAccepted
}

Expand All @@ -171,8 +176,8 @@ func (up *updateProtocol) Reject(err error) {
Body: protocol.MustMarshalAny(&updatepb.Rejection{
RejectedRequestMessageId: up.requestMsgID,
RejectedRequestSequencingEventId: up.requestSeqID,
RejectedRequest: up.initialRequest,
Failure: up.env.GetFailureConverter().ErrorToFailure(err),
// RejectedRequest field no longer read by server - will be removed from API soon
}),
})
up.state = updateStateCompleted
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,8 @@ func TestAcceptedEventPredicate(t *testing.T) {

var acptmsg updatepb.Acceptance
require.NoError(t, env.outbox[0].msg.Body.UnmarshalTo(&acptmsg))
require.Nil(t, acptmsg.AcceptedRequest,
"do not send the original request back - this field will be removed soon")
require.EqualExportedValues(t, &request, acptmsg.AcceptedRequest,
"Sent the original request back in the accepted message")

pred := env.outbox[0].eventPredicate
for _, tc := range [...]struct {
Expand Down

0 comments on commit fb06909

Please sign in to comment.