From 30d861c0b74fae2aefab1092663ddf1427c87768 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 28 Oct 2024 17:02:01 +0200 Subject: [PATCH 1/3] Changed endpoints message index to start at 0 --- access/handler.go | 8 ++++++-- engine/access/state_stream/backend/handler.go | 12 ++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/access/handler.go b/access/handler.go index 3007fc6f691..d4205d3d481 100644 --- a/access/handler.go +++ b/access/handler.go @@ -1435,15 +1435,19 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( messageIndex := counters.NewMonotonousCounter(0) return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error { for i := range txResults { - value := messageIndex.Increment() + index := messageIndex.Value() err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{ TransactionResults: TransactionResultToMessage(txResults[i]), - MessageIndex: value, + MessageIndex: index, }) if err != nil { return rpc.ConvertError(err, "could not send response", codes.Internal) } + + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } } return nil diff --git a/engine/access/state_stream/backend/handler.go b/engine/access/state_stream/backend/handler.go index 11281da9486..8ed2ca8051d 100644 --- a/engine/access/state_stream/backend/handler.go +++ b/engine/access/state_stream/backend/handler.go @@ -381,7 +381,7 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea return status.Errorf(codes.Internal, "could not convert events to entity: %v", err) } - index := messageIndex.Increment() + index := messageIndex.Value() err = send(&executiondata.SubscribeEventsResponse{ BlockHeight: resp.Height, @@ -394,6 +394,10 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea return rpc.ConvertError(err, "could not send response", codes.Internal) } + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + return nil } } @@ -495,7 +499,7 @@ func (h *Handler) handleAccountStatusesResponse( return err } - index := messageIndex.Increment() + index := messageIndex.Value() err = send(&executiondata.SubscribeAccountStatusesResponse{ BlockId: convert.IdentifierToMessage(resp.BlockID), @@ -507,6 +511,10 @@ func (h *Handler) handleAccountStatusesResponse( return rpc.ConvertError(err, "could not send response", codes.Internal) } + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + return nil } } From c41f1352c0ebb9aa5c41c320c0a9d893bf279a6e Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 30 Oct 2024 17:08:27 +0200 Subject: [PATCH 2/3] Fixed integration test --- integration/tests/access/cohort1/access_api_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/tests/access/cohort1/access_api_test.go b/integration/tests/access/cohort1/access_api_test.go index f070c1a4140..6470ced35b2 100644 --- a/integration/tests/access/cohort1/access_api_test.go +++ b/integration/tests/access/cohort1/access_api_test.go @@ -319,7 +319,7 @@ func (s *AccessAPISuite) TestSendAndSubscribeTransactionStatuses() { }) s.Require().NoError(err) - expectedCounter := uint64(1) + expectedCounter := uint64(0) lastReportedTxStatus := entities.TransactionStatus_UNKNOWN var txID sdk.Identifier From 299829dac5cff473e37798e6a8ffde4bb732d3dd Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 5 Nov 2024 11:47:04 +0200 Subject: [PATCH 3/3] Moved check for index to return error instantly --- access/handler.go | 6 +++--- engine/access/state_stream/backend/handler.go | 14 ++++++-------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/access/handler.go b/access/handler.go index d4205d3d481..25316e7f3dd 100644 --- a/access/handler.go +++ b/access/handler.go @@ -1436,6 +1436,9 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error { for i := range txResults { index := messageIndex.Value() + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{ TransactionResults: TransactionResultToMessage(txResults[i]), @@ -1445,9 +1448,6 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( return rpc.ConvertError(err, "could not send response", codes.Internal) } - if ok := messageIndex.Set(index + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } } return nil diff --git a/engine/access/state_stream/backend/handler.go b/engine/access/state_stream/backend/handler.go index 8ed2ca8051d..b2066440bb8 100644 --- a/engine/access/state_stream/backend/handler.go +++ b/engine/access/state_stream/backend/handler.go @@ -382,6 +382,9 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea } index := messageIndex.Value() + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } err = send(&executiondata.SubscribeEventsResponse{ BlockHeight: resp.Height, @@ -394,10 +397,6 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea return rpc.ConvertError(err, "could not send response", codes.Internal) } - if ok := messageIndex.Set(index + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - return nil } } @@ -500,6 +499,9 @@ func (h *Handler) handleAccountStatusesResponse( } index := messageIndex.Value() + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } err = send(&executiondata.SubscribeAccountStatusesResponse{ BlockId: convert.IdentifierToMessage(resp.BlockID), @@ -511,10 +513,6 @@ func (h *Handler) handleAccountStatusesResponse( return rpc.ConvertError(err, "could not send response", codes.Internal) } - if ok := messageIndex.Set(index + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - return nil } }