From fed835a831ebb7cb6771f62e354b3baa26e4b937 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 22 Nov 2024 16:15:49 +0200 Subject: [PATCH 01/30] Added skeleton for events data provider --- .../data_providers/events_provider.go | 146 ++++++++++++++++++ .../rest/websockets/data_providers/factory.go | 7 +- .../rest/websockets/models/event_models.go | 8 + 3 files changed, 158 insertions(+), 3 deletions(-) create mode 100644 engine/access/rest/websockets/data_providers/events_provider.go create mode 100644 engine/access/rest/websockets/models/event_models.go diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go new file mode 100644 index 00000000000..2bc8612194a --- /dev/null +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -0,0 +1,146 @@ +package data_providers + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/engine/access/rest/common/parser" + "github.com/onflow/flow-go/engine/access/rest/http/request" + "github.com/onflow/flow-go/engine/access/rest/util" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" +) + +// EventsArguments contains the arguments required for subscribing to events +type EventsArguments struct { + StartBlockID flow.Identifier // ID of the block to start subscription from + StartBlockHeight uint64 // Height of the block to start subscription from + Filter state_stream.EventFilter // Filter applied to events for a given subscription +} + +// EventsDataProvider is responsible for providing events +type EventsDataProvider struct { + *BaseDataProviderImpl + + logger zerolog.Logger + args EventsArguments + stateStreamApi state_stream.API +} + +var _ DataProvider = (*EventsDataProvider)(nil) + +// NewEventsDataProvider creates a new instance of EventsDataProvider. +func NewEventsDataProvider( + ctx context.Context, + logger zerolog.Logger, + stateStreamApi state_stream.API, + topic string, + arguments map[string]string, + send chan<- interface{}, +) (*EventsDataProvider, error) { + p := &EventsDataProvider{ + logger: logger.With().Str("component", "events-data-provider").Logger(), + stateStreamApi: stateStreamApi, + } + + // Initialize arguments passed to the provider. + var err error + p.args, err = ParseEventsArguments(arguments) + if err != nil { + return nil, fmt.Errorf("invalid arguments for events data provider: %w", err) + } + + subCtx, cancel := context.WithCancel(ctx) + + // Set up a subscription to events based on arguments. + sub := p.createSubscription(subCtx) + + p.BaseDataProviderImpl = NewBaseDataProviderImpl( + cancel, + topic, + send, + sub, + ) + + return p, nil +} + +// Run starts processing the subscription for events and handles responses. +// +// No errors are expected during normal operations. +func (p *EventsDataProvider) Run() error { + return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send)) +} + +// createSubscription creates a new subscription using the specified input arguments. +func (p *EventsDataProvider) createSubscription(ctx context.Context) subscription.Subscription { + if p.args.StartBlockID != flow.ZeroID && p.args.StartBlockHeight != request.EmptyHeight { + return p.stateStreamApi.SubscribeEvents(ctx, p.args.StartBlockID, p.args.StartBlockHeight, p.args.Filter) + } + + if p.args.StartBlockID != flow.ZeroID { + return p.stateStreamApi.SubscribeEventsFromStartBlockID(ctx, p.args.StartBlockID, p.args.Filter) + } + + if p.args.StartBlockHeight != request.EmptyHeight { + return p.stateStreamApi.SubscribeEventsFromStartHeight(ctx, p.args.StartBlockHeight, p.args.Filter) + } + + return p.stateStreamApi.SubscribeEventsFromLatest(ctx, p.args.Filter) +} + +// handleResponse processes an event and sends the formatted response. +// +// No errors are expected during normal operations. +func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(*flow.Event) error { + return func(event *flow.Event) error { + send <- &models.EventResponse{ + Event: event, + } + + return nil + } +} + +// ParseEventsArguments validates and initializes the events arguments. +func ParseEventsArguments(arguments map[string]string) (EventsArguments, error) { + var args EventsArguments + + // Parse + if eventStatusIn, ok := arguments["event_filter"]; ok { + eventFilter := parser.ParseEventFilter(eventStatusIn) + if err != nil { + return args, err + } + args.Filter = eventFilter + } else { + return args, fmt.Errorf("'event_filter' must be provided") + } + + // Parse 'start_block_id' if provided + if startBlockIDIn, ok := arguments["start_block_id"]; ok { + var startBlockID parser.ID + err := startBlockID.Parse(startBlockIDIn) + if err != nil { + return args, err + } + args.StartBlockID = startBlockID.Flow() + } + + // Parse 'start_block_height' if provided + if startBlockHeightIn, ok := arguments["start_block_height"]; ok { + var err error + args.StartBlockHeight, err = util.ToUint64(startBlockHeightIn) + if err != nil { + return args, fmt.Errorf("invalid 'start_block_height': %w", err) + } + } else { + args.StartBlockHeight = request.EmptyHeight + } + + return args, nil +} diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 6ec8dd3185a..a92b689382d 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -73,9 +73,10 @@ func (s *DataProviderFactory) NewDataProvider( return NewBlockHeadersDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case BlockDigestsTopic: return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) - // TODO: Implemented handlers for each topic should be added in respective case - case EventsTopic, - AccountStatusesTopic, + case EventsTopic: + return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch) + // TODO: Implemented handlers for each topic should be added in respective case + case AccountStatusesTopic, TransactionStatusesTopic: return nil, fmt.Errorf("topic \"%s\" not implemented yet", topic) default: diff --git a/engine/access/rest/websockets/models/event_models.go b/engine/access/rest/websockets/models/event_models.go new file mode 100644 index 00000000000..9142c13a7a8 --- /dev/null +++ b/engine/access/rest/websockets/models/event_models.go @@ -0,0 +1,8 @@ +package models + +import "github.com/onflow/flow-go/model/flow" + +// EventResponse is the response message for 'events' topic. +type EventResponse struct { + Event *flow.Event `json:"event"` +} From 9c10d24c77b8958d0948b53d3d752e684359de92 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 22 Nov 2024 17:33:05 +0200 Subject: [PATCH 02/30] Added initializing of events filter, added missing data to factory --- .../data_providers/events_provider.go | 56 +++++++++++++------ .../data_providers/events_provider_test.go | 1 + .../rest/websockets/data_providers/factory.go | 14 +++-- 3 files changed, 50 insertions(+), 21 deletions(-) create mode 100644 engine/access/rest/websockets/data_providers/events_provider_test.go diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 2bc8612194a..4da33bd4e64 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -3,6 +3,7 @@ package data_providers import ( "context" "fmt" + "strings" "github.com/rs/zerolog" @@ -38,6 +39,8 @@ func NewEventsDataProvider( ctx context.Context, logger zerolog.Logger, stateStreamApi state_stream.API, + chain flow.Chain, + eventFilterConfig state_stream.EventFilterConfig, topic string, arguments map[string]string, send chan<- interface{}, @@ -49,7 +52,7 @@ func NewEventsDataProvider( // Initialize arguments passed to the provider. var err error - p.args, err = ParseEventsArguments(arguments) + p.args, err = ParseEventsArguments(arguments, chain, eventFilterConfig) if err != nil { return nil, fmt.Errorf("invalid arguments for events data provider: %w", err) } @@ -60,8 +63,8 @@ func NewEventsDataProvider( sub := p.createSubscription(subCtx) p.BaseDataProviderImpl = NewBaseDataProviderImpl( - cancel, topic, + cancel, send, sub, ) @@ -78,10 +81,6 @@ func (p *EventsDataProvider) Run() error { // createSubscription creates a new subscription using the specified input arguments. func (p *EventsDataProvider) createSubscription(ctx context.Context) subscription.Subscription { - if p.args.StartBlockID != flow.ZeroID && p.args.StartBlockHeight != request.EmptyHeight { - return p.stateStreamApi.SubscribeEvents(ctx, p.args.StartBlockID, p.args.StartBlockHeight, p.args.Filter) - } - if p.args.StartBlockID != flow.ZeroID { return p.stateStreamApi.SubscribeEventsFromStartBlockID(ctx, p.args.StartBlockID, p.args.Filter) } @@ -107,24 +106,48 @@ func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(*flow. } // ParseEventsArguments validates and initializes the events arguments. -func ParseEventsArguments(arguments map[string]string) (EventsArguments, error) { +func ParseEventsArguments( + arguments map[string]string, + chain flow.Chain, + eventFilterConfig state_stream.EventFilterConfig, +) (EventsArguments, error) { var args EventsArguments - // Parse - if eventStatusIn, ok := arguments["event_filter"]; ok { - eventFilter := parser.ParseEventFilter(eventStatusIn) - if err != nil { - return args, err + // Parse 'event_types' as []string{} + var eventTypes []string + if eventTypesIn, ok := arguments["event_types"]; ok { + if eventTypesIn != "" { + eventTypes = strings.Split(eventTypesIn, ",") } - args.Filter = eventFilter - } else { - return args, fmt.Errorf("'event_filter' must be provided") } + // Parse 'addresses' as []string{} + var addresses []string + if addressesIn, ok := arguments["addresses"]; ok { + if addressesIn != "" { + addresses = strings.Split(addressesIn, ",") + } + } + + // Parse 'contracts' as []string{} + var contracts []string + if contractsIn, ok := arguments["contracts"]; ok { + if contractsIn != "" { + contracts = strings.Split(contractsIn, ",") + } + } + + // Initialize the event filter with the parsed arguments + filter, err := state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes, addresses, contracts) + if err != nil { + return args, err + } + args.Filter = filter + // Parse 'start_block_id' if provided if startBlockIDIn, ok := arguments["start_block_id"]; ok { var startBlockID parser.ID - err := startBlockID.Parse(startBlockIDIn) + err = startBlockID.Parse(startBlockIDIn) if err != nil { return args, err } @@ -133,7 +156,6 @@ func ParseEventsArguments(arguments map[string]string) (EventsArguments, error) // Parse 'start_block_height' if provided if startBlockHeightIn, ok := arguments["start_block_height"]; ok { - var err error args.StartBlockHeight, err = util.ToUint64(startBlockHeightIn) if err != nil { return args, fmt.Errorf("invalid 'start_block_height': %w", err) diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go new file mode 100644 index 00000000000..06387b46331 --- /dev/null +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -0,0 +1 @@ +package data_providers diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 465d407c431..4b45bbcccc1 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -3,6 +3,7 @@ package data_providers import ( "context" "fmt" + "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" @@ -42,6 +43,9 @@ type DataProviderFactoryImpl struct { stateStreamApi state_stream.API accessApi access.API + + chain flow.Chain + eventFilterConfig state_stream.EventFilterConfig } // NewDataProviderFactory creates a new DataProviderFactory @@ -55,11 +59,13 @@ func NewDataProviderFactory( logger zerolog.Logger, stateStreamApi state_stream.API, accessApi access.API, + eventFilterConfig state_stream.EventFilterConfig, ) *DataProviderFactoryImpl { return &DataProviderFactoryImpl{ - logger: logger, - stateStreamApi: stateStreamApi, - accessApi: accessApi, + logger: logger, + stateStreamApi: stateStreamApi, + accessApi: accessApi, + eventFilterConfig: eventFilterConfig, } } @@ -87,7 +93,7 @@ func (s *DataProviderFactoryImpl) NewDataProvider( case BlockDigestsTopic: return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case EventsTopic: - return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch) + return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, s.chain, s.eventFilterConfig, topic, arguments, ch) // TODO: Implemented handlers for each topic should be added in respective case case AccountStatusesTopic, TransactionStatusesTopic: From 9547b5d59cca97dfb74458f7b7e90d096e7305d2 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 22 Nov 2024 17:49:18 +0200 Subject: [PATCH 03/30] Fixed factory test --- engine/access/rest/server.go | 2 +- engine/access/rest/websockets/data_providers/factory.go | 4 +++- engine/access/rest/websockets/data_providers/factory_test.go | 5 ++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index 4f0e2260ae5..71304db22a1 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -51,7 +51,7 @@ func NewServer(serverAPI access.API, builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize) } - dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI) + dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI, chain, stateStreamConfig.EventFilterConfig) builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory) c := cors.New(cors.Options{ diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 4b45bbcccc1..a0dd6ae5bbd 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -3,12 +3,12 @@ package data_providers import ( "context" "fmt" - "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/model/flow" ) // Constants defining various topic names used to specify different types of @@ -59,12 +59,14 @@ func NewDataProviderFactory( logger zerolog.Logger, stateStreamApi state_stream.API, accessApi access.API, + chain flow.Chain, eventFilterConfig state_stream.EventFilterConfig, ) *DataProviderFactoryImpl { return &DataProviderFactoryImpl{ logger: logger, stateStreamApi: stateStreamApi, accessApi: accessApi, + chain: chain, eventFilterConfig: eventFilterConfig, } } diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index ce4b16e97f6..1f8a7d488be 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -10,6 +10,7 @@ import ( accessmock "github.com/onflow/flow-go/access/mock" "github.com/onflow/flow-go/engine/access/rest/common/parser" + "github.com/onflow/flow-go/engine/access/state_stream" statestreammock "github.com/onflow/flow-go/engine/access/state_stream/mock" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" @@ -42,7 +43,9 @@ func (s *DataProviderFactorySuite) SetupTest() { s.ctx = context.Background() s.ch = make(chan interface{}) - s.factory = NewDataProviderFactory(log, s.stateStreamApi, s.accessApi) + chain := flow.Testnet.Chain() + + s.factory = NewDataProviderFactory(log, s.stateStreamApi, s.accessApi, chain, state_stream.DefaultEventFilterConfig) s.Require().NotNil(s.factory) } From 0f34ae160f4f5ac1269a266dc0def2f348c89d7b Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 25 Nov 2024 16:09:28 +0200 Subject: [PATCH 04/30] Added test skeleton for testing invalid arguments --- .../data_providers/events_provider.go | 71 ++++++------ .../data_providers/events_provider_test.go | 108 ++++++++++++++++++ 2 files changed, 146 insertions(+), 33 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 4da33bd4e64..6cf0f4daea1 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -113,50 +113,30 @@ func ParseEventsArguments( ) (EventsArguments, error) { var args EventsArguments - // Parse 'event_types' as []string{} - var eventTypes []string - if eventTypesIn, ok := arguments["event_types"]; ok { - if eventTypesIn != "" { - eventTypes = strings.Split(eventTypesIn, ",") - } - } + // Check for mutual exclusivity of start_block_id and start_block_height early + _, hasStartBlockID := arguments["start_block_id"] + _, hasStartBlockHeight := arguments["start_block_height"] - // Parse 'addresses' as []string{} - var addresses []string - if addressesIn, ok := arguments["addresses"]; ok { - if addressesIn != "" { - addresses = strings.Split(addressesIn, ",") - } + if hasStartBlockID && hasStartBlockHeight { + return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'") } - // Parse 'contracts' as []string{} - var contracts []string - if contractsIn, ok := arguments["contracts"]; ok { - if contractsIn != "" { - contracts = strings.Split(contractsIn, ",") - } - } - - // Initialize the event filter with the parsed arguments - filter, err := state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes, addresses, contracts) - if err != nil { - return args, err - } - args.Filter = filter - // Parse 'start_block_id' if provided - if startBlockIDIn, ok := arguments["start_block_id"]; ok { + if hasStartBlockID { var startBlockID parser.ID - err = startBlockID.Parse(startBlockIDIn) + err := startBlockID.Parse(arguments["start_block_id"]) if err != nil { - return args, err + return args, fmt.Errorf("invalid 'start_block_id': %w", err) } args.StartBlockID = startBlockID.Flow() + } else { + args.StartBlockID = flow.ZeroID } // Parse 'start_block_height' if provided - if startBlockHeightIn, ok := arguments["start_block_height"]; ok { - args.StartBlockHeight, err = util.ToUint64(startBlockHeightIn) + if hasStartBlockHeight { + var err error + args.StartBlockHeight, err = util.ToUint64(arguments["start_block_height"]) if err != nil { return args, fmt.Errorf("invalid 'start_block_height': %w", err) } @@ -164,5 +144,30 @@ func ParseEventsArguments( args.StartBlockHeight = request.EmptyHeight } + // Parse 'event_types' as []string{} + var eventTypes []string + if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { + eventTypes = strings.Split(eventTypesIn, ",") + } + + // Parse 'addresses' as []string{} + var addresses []string + if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" { + addresses = strings.Split(addressesIn, ",") + } + + // Parse 'contracts' as []string{} + var contracts []string + if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" { + contracts = strings.Split(contractsIn, ",") + } + + // Initialize the event filter with the parsed arguments + filter, err := state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes, addresses, contracts) + if err != nil { + return args, fmt.Errorf("failed to create event filter: %w", err) + } + args.Filter = filter + return args, nil } diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 06387b46331..d3da592fcb9 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -1 +1,109 @@ package data_providers + +import ( + "context" + "fmt" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine/access/state_stream" + ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// EventsProviderSuite is a test suite for testing the events providers functionality. +type EventsProviderSuite struct { + suite.Suite + + log zerolog.Logger + api *ssmock.API + + chain flow.Chain + rootBlock flow.Block + finalizedBlock *flow.Header +} + +func TestEventsProviderSuite(t *testing.T) { + suite.Run(t, new(EventsProviderSuite)) +} + +func (s *EventsProviderSuite) SetupTest() { + s.log = unittest.Logger() + s.api = ssmock.NewAPI(s.T()) + + s.chain = flow.Testnet.Chain() + + s.rootBlock = unittest.BlockFixture() + s.rootBlock.Header.Height = 0 +} + +// invalidArgumentsTestCases returns a list of test cases with invalid argument combinations +// for testing the behavior of events data providers. Each test case includes a name, +// a set of input arguments, and the expected error message that should be returned. +// +// The test cases cover scenarios such as: +// 1. Supplying both 'start_block_id' and 'start_block_height' simultaneously, which is not allowed. +// 2. Providing invalid 'start_block_id' value. +// 3. Providing invalid 'start_block_height' value. +func (s *EventsProviderSuite) invalidArgumentsTestCases() []testErrType { + return []testErrType{ + { + name: "provide both 'start_block_id' and 'start_block_height' arguments", + arguments: map[string]string{ + "start_block_id": s.rootBlock.ID().String(), + "start_block_height": fmt.Sprintf("%d", s.rootBlock.Header.Height), + }, + expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'", + }, + { + name: "invalid 'start_block_id' argument", + arguments: map[string]string{ + "start_block_id": "invalid_block_id", + }, + expectedErrorMsg: "invalid ID format", + }, + { + name: "invalid 'start_block_height' argument", + arguments: map[string]string{ + "start_block_height": "-1", + }, + expectedErrorMsg: "value must be an unsigned 64 bit integer", + }, + } +} + +// TestEventsDataProvider_InvalidArguments tests the behavior of the event data provider +// when invalid arguments are provided. It verifies that appropriate errors are returned +// for missing or conflicting arguments. +// This test covers the test cases: +// 1. Providing both 'start_block_id' and 'start_block_height' simultaneously. +// 2. Invalid 'start_block_id' argument. +// 3. Invalid 'start_block_height' argument. +func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { + ctx := context.Background() + send := make(chan interface{}) + + topic := EventsTopic + + for _, test := range s.invalidArgumentsTestCases() { + s.Run(test.name, func() { + provider, err := NewEventsDataProvider( + ctx, + s.log, + s.api, + s.chain, + state_stream.DefaultEventFilterConfig, + topic, + test.arguments, + send) + s.Require().Nil(provider) + s.Require().Error(err) + s.Require().Contains(err.Error(), test.expectedErrorMsg) + }) + } +} + +// TODO: add tests for responses after the WebsocketController is ready From 411f9e5ee5bef0ee7df6612c26c8c08b7eeec999 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 26 Nov 2024 18:40:05 +0200 Subject: [PATCH 05/30] Added test for messageIndex check --- .../data_providers/events_provider.go | 14 +++- .../data_providers/events_provider_test.go | 74 ++++++++++++++++++- .../rest/websockets/models/event_models.go | 3 +- 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 6cf0f4daea1..f4e36bb5c11 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -3,9 +3,12 @@ package data_providers import ( "context" "fmt" + "strconv" "strings" "github.com/rs/zerolog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/http/request" @@ -14,6 +17,7 @@ import ( "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/counters" ) // EventsArguments contains the arguments required for subscribing to events @@ -96,9 +100,17 @@ func (p *EventsDataProvider) createSubscription(ctx context.Context) subscriptio // // No errors are expected during normal operations. func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(*flow.Event) error { + messageIndex := counters.NewMonotonousCounter(0) + return func(event *flow.Event) error { + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + index := messageIndex.Value() + send <- &models.EventResponse{ - Event: event, + Event: event, + MessageIndex: strconv.FormatUint(index, 10), } return nil diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index d3da592fcb9..8ee8867dc54 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -3,11 +3,14 @@ package data_providers import ( "context" "fmt" + "strconv" "testing" "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" "github.com/onflow/flow-go/model/flow" @@ -106,4 +109,73 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { } } -// TODO: add tests for responses after the WebsocketController is ready +func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() { + ctx := context.Background() + send := make(chan interface{}, 10) + topic := EventsTopic + eventsCount := 4 + + // Create a channel to simulate the subscription's event channel + eventChan := make(chan interface{}) + + // Create a mock subscription and mock the channel + sub := ssmock.NewSubscription(s.T()) + sub.On("Channel").Return((<-chan interface{})(eventChan)) + sub.On("Err").Return(nil) + + s.api.On("SubscribeEventsFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub) + + arguments := + map[string]string{ + "start_block_id": s.rootBlock.ID().String(), + } + + // Create the EventsDataProvider instance + provider, err := NewEventsDataProvider( + ctx, + s.log, + s.api, + s.chain, + state_stream.DefaultEventFilterConfig, + topic, + arguments, + send) + s.Require().NotNil(provider) + s.Require().NoError(err) + + // Run the provider in a separate goroutine to simulate subscription processing + go func() { + err = provider.Run() + s.Require().NoError(err) + }() + + // Simulate emitting events to the event channel + go func() { + defer close(eventChan) // Close the channel when done + + for i := 0; i < eventsCount; i++ { + eventChan <- &flow.Event{ + Type: "flow.AccountCreated", + } + } + }() + + // Collect responses + var responses []*models.EventResponse + for i := 0; i < eventsCount; i++ { + res := <-send + eventRes, ok := res.(*models.EventResponse) + s.Require().True(ok, "Expected *models.EventResponse, got %T", res) + responses = append(responses, eventRes) + } + + // Verifying that indices are strictly increasing + for i := 1; i < len(responses); i++ { + prevIndex, _ := strconv.Atoi(responses[i-1].MessageIndex) + currentIndex, _ := strconv.Atoi(responses[i].MessageIndex) + s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1") + } + + // Ensure the provider is properly closed after the test + provider.Close() +} diff --git a/engine/access/rest/websockets/models/event_models.go b/engine/access/rest/websockets/models/event_models.go index 9142c13a7a8..0569ebaf4ae 100644 --- a/engine/access/rest/websockets/models/event_models.go +++ b/engine/access/rest/websockets/models/event_models.go @@ -4,5 +4,6 @@ import "github.com/onflow/flow-go/model/flow" // EventResponse is the response message for 'events' topic. type EventResponse struct { - Event *flow.Event `json:"event"` + Event *flow.Event `json:"event"` + MessageIndex string `json:"message_index"` } From 636740a10eceab345bfa82ddda3349ecb04bbad9 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 28 Nov 2024 12:52:49 +0200 Subject: [PATCH 06/30] Added check for a valid event types in parse function --- .../{http/request => common/parser}/event_type.go | 2 +- engine/access/rest/http/request/get_events.go | 2 +- .../rest/websockets/data_providers/events_provider.go | 11 ++++++----- .../websockets/legacy/request/subscribe_events.go | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) rename engine/access/rest/{http/request => common/parser}/event_type.go (98%) diff --git a/engine/access/rest/http/request/event_type.go b/engine/access/rest/common/parser/event_type.go similarity index 98% rename from engine/access/rest/http/request/event_type.go rename to engine/access/rest/common/parser/event_type.go index c3f425d81c8..f1ba7ca1acb 100644 --- a/engine/access/rest/http/request/event_type.go +++ b/engine/access/rest/common/parser/event_type.go @@ -1,4 +1,4 @@ -package request +package parser import ( "fmt" diff --git a/engine/access/rest/http/request/get_events.go b/engine/access/rest/http/request/get_events.go index c864cf24a47..dee55f98ded 100644 --- a/engine/access/rest/http/request/get_events.go +++ b/engine/access/rest/http/request/get_events.go @@ -71,7 +71,7 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo if rawType == "" { return fmt.Errorf("event type must be provided") } - var eventType EventType + var eventType parser.EventType err = eventType.Parse(rawType) if err != nil { return err diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index f4e36bb5c11..240ede85b71 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -141,8 +141,6 @@ func ParseEventsArguments( return args, fmt.Errorf("invalid 'start_block_id': %w", err) } args.StartBlockID = startBlockID.Flow() - } else { - args.StartBlockID = flow.ZeroID } // Parse 'start_block_height' if provided @@ -157,9 +155,12 @@ func ParseEventsArguments( } // Parse 'event_types' as []string{} - var eventTypes []string + var eventTypes parser.EventTypes if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { - eventTypes = strings.Split(eventTypesIn, ",") + err := eventTypes.Parse(strings.Split(eventTypesIn, ",")) + if err != nil { + return args, fmt.Errorf("invalid 'event_types': %w", err) + } } // Parse 'addresses' as []string{} @@ -175,7 +176,7 @@ func ParseEventsArguments( } // Initialize the event filter with the parsed arguments - filter, err := state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes, addresses, contracts) + filter, err := state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes.Flow(), addresses, contracts) if err != nil { return args, fmt.Errorf("failed to create event filter: %w", err) } diff --git a/engine/access/rest/websockets/legacy/request/subscribe_events.go b/engine/access/rest/websockets/legacy/request/subscribe_events.go index 1110d3582d4..9e53e7c5fca 100644 --- a/engine/access/rest/websockets/legacy/request/subscribe_events.go +++ b/engine/access/rest/websockets/legacy/request/subscribe_events.go @@ -81,7 +81,7 @@ func (g *SubscribeEvents) Parse( g.StartHeight = 0 } - var eventTypes request.EventTypes + var eventTypes parser.EventTypes err = eventTypes.Parse(rawTypes) if err != nil { return err From 588688eaa19e6f972cd29c336a3a6ab1210dbf5b Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 28 Nov 2024 13:04:41 +0200 Subject: [PATCH 07/30] Changed type of arguments for consistency --- .../access/rest/websockets/data_providers/events_provider.go | 4 ++-- .../rest/websockets/data_providers/events_provider_test.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 240ede85b71..94856bab0ec 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -46,7 +46,7 @@ func NewEventsDataProvider( chain flow.Chain, eventFilterConfig state_stream.EventFilterConfig, topic string, - arguments map[string]string, + arguments models.Arguments, send chan<- interface{}, ) (*EventsDataProvider, error) { p := &EventsDataProvider{ @@ -119,7 +119,7 @@ func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(*flow. // ParseEventsArguments validates and initializes the events arguments. func ParseEventsArguments( - arguments map[string]string, + arguments models.Arguments, chain flow.Chain, eventFilterConfig state_stream.EventFilterConfig, ) (EventsArguments, error) { diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 8ee8867dc54..063d54b4452 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -55,7 +55,7 @@ func (s *EventsProviderSuite) invalidArgumentsTestCases() []testErrType { return []testErrType{ { name: "provide both 'start_block_id' and 'start_block_height' arguments", - arguments: map[string]string{ + arguments: models.Arguments{ "start_block_id": s.rootBlock.ID().String(), "start_block_height": fmt.Sprintf("%d", s.rootBlock.Header.Height), }, @@ -109,6 +109,7 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { } } +// TestMessageIndexEventProviderResponse_HappyPath tests that MessageIndex values in response are strictly increasing. func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() { ctx := context.Background() send := make(chan interface{}, 10) From b537a5f3e2e4d11caa814898f13943b36fad26db Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 28 Nov 2024 13:08:22 +0200 Subject: [PATCH 08/30] Added test case for event provider in factory_test --- .../rest/websockets/data_providers/factory_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 7989d6c193c..6d39d1c924c 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -104,6 +104,17 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { s.accessApi.AssertExpectations(s.T()) }, }, + { + name: "events topic", + topic: EventsTopic, + arguments: models.Arguments{}, + setupSubscription: func() { + s.setupSubscription(s.stateStreamApi.On("SubscribeEventsFromLatest", mock.Anything, mock.Anything)) + }, + assertExpectations: func() { + s.stateStreamApi.AssertExpectations(s.T()) + }, + }, } for _, test := range testCases { From 4f63403c9a95ad2d5ab5d5d614bf88ff31c78b92 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 29 Nov 2024 18:32:27 +0200 Subject: [PATCH 09/30] Added implementations for account provider functions --- .../account_statuses_provider.go | 180 ++++++++++++++++++ .../account_statuses_provider_test.go | 1 + .../rest/websockets/models/account_models.go | 11 ++ 3 files changed, 192 insertions(+) create mode 100644 engine/access/rest/websockets/data_providers/account_statuses_provider.go create mode 100644 engine/access/rest/websockets/data_providers/account_statuses_provider_test.go create mode 100644 engine/access/rest/websockets/models/account_models.go diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go new file mode 100644 index 00000000000..52b45ace9f6 --- /dev/null +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -0,0 +1,180 @@ +package data_providers + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/rs/zerolog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/onflow/flow-go/engine/access/rest/common/parser" + "github.com/onflow/flow-go/engine/access/rest/http/request" + "github.com/onflow/flow-go/engine/access/rest/util" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/engine/access/state_stream/backend" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/counters" +) + +type AccountStatusesArguments struct { + StartBlockID flow.Identifier // ID of the block to start subscription from + StartBlockHeight uint64 // Height of the block to start subscription from + Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription +} + +type AccountStatusesDataProvider struct { + *BaseDataProviderImpl + + logger zerolog.Logger + args AccountStatusesArguments + stateStreamApi state_stream.API +} + +var _ DataProvider = (*AccountStatusesDataProvider)(nil) + +// NewAccountStatusesDataProvider creates a new instance of AccountStatusesDataProvider. +func NewAccountStatusesDataProvider( + ctx context.Context, + logger zerolog.Logger, + stateStreamApi state_stream.API, + chain flow.Chain, + eventFilterConfig state_stream.EventFilterConfig, + topic string, + arguments models.Arguments, + send chan<- interface{}, +) (*AccountStatusesDataProvider, error) { + p := &AccountStatusesDataProvider{ + logger: logger.With().Str("component", "account-statuses-data-provider").Logger(), + stateStreamApi: stateStreamApi, + } + + var err error + p.args, err = ParseAccountStatusesArguments(arguments, chain, eventFilterConfig) + if err != nil { + return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err) + } + + subCtx, cancel := context.WithCancel(ctx) + + // Set up a subscription to events based on arguments. + sub := p.createSubscription(subCtx) + + p.BaseDataProviderImpl = NewBaseDataProviderImpl( + topic, + cancel, + send, + sub, + ) + + return p, nil +} + +// Run starts processing the subscription for events and handles responses. +// +// No errors are expected during normal operations. +func (p *AccountStatusesDataProvider) Run() error { + return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send)) +} + +// createSubscription creates a new subscription using the specified input arguments. +func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context) subscription.Subscription { + if p.args.StartBlockID != flow.ZeroID { + return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, p.args.StartBlockID, p.args.Filter) + } + + if p.args.StartBlockHeight != request.EmptyHeight { + return p.stateStreamApi.SubscribeAccountStatusesFromStartHeight(ctx, p.args.StartBlockHeight, p.args.Filter) + } + + return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, p.args.Filter) +} + +// handleResponse processes an account statuses and sends the formatted response. +// +// No errors are expected during normal operations. +func (p *AccountStatusesDataProvider) handleResponse(send chan<- interface{}) func(accountStatusesResponse *backend.AccountStatusesResponse) error { + messageIndex := counters.NewMonotonousCounter(0) + + return func(accountStatusesResponse *backend.AccountStatusesResponse) error { + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + index := messageIndex.Value() + + send <- &models.AccountStatusesResponse{ + BlockID: accountStatusesResponse.BlockID.String(), + Height: strconv.FormatUint(accountStatusesResponse.Height, 10), + AccountEvents: accountStatusesResponse.AccountEvents, + MessageIndex: strconv.FormatUint(index, 10), + } + + return nil + } +} + +// ParseAccountStatusesArguments validates and initializes the account statuses arguments. +func ParseAccountStatusesArguments( + arguments models.Arguments, + chain flow.Chain, + eventFilterConfig state_stream.EventFilterConfig, +) (AccountStatusesArguments, error) { + var args AccountStatusesArguments + + // Check for mutual exclusivity of start_block_id and start_block_height early + _, hasStartBlockID := arguments["start_block_id"] + _, hasStartBlockHeight := arguments["start_block_height"] + + if hasStartBlockID && hasStartBlockHeight { + return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'") + } + + // Parse 'start_block_id' if provided + if hasStartBlockID { + var startBlockID parser.ID + err := startBlockID.Parse(arguments["start_block_id"]) + if err != nil { + return args, fmt.Errorf("invalid 'start_block_id': %w", err) + } + args.StartBlockID = startBlockID.Flow() + } + + // Parse 'start_block_height' if provided + if hasStartBlockHeight { + var err error + args.StartBlockHeight, err = util.ToUint64(arguments["start_block_height"]) + if err != nil { + return args, fmt.Errorf("invalid 'start_block_height': %w", err) + } + } else { + args.StartBlockHeight = request.EmptyHeight + } + + // Parse 'event_types' as []string{} + var eventTypes parser.EventTypes + if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { + err := eventTypes.Parse(strings.Split(eventTypesIn, ",")) + if err != nil { + return args, fmt.Errorf("invalid 'event_types': %w", err) + } + } + + // Parse 'accountAddresses' as []string{} + var accountAddresses []string + if addressesIn, ok := arguments["accountAddresses"]; ok && addressesIn != "" { + accountAddresses = strings.Split(addressesIn, ",") + } + + // Initialize the event filter with the parsed arguments + filter, err := state_stream.NewAccountStatusFilter(eventFilterConfig, chain, eventTypes.Flow(), accountAddresses) + if err != nil { + return args, fmt.Errorf("failed to create event filter: %w", err) + } + args.Filter = filter + + return args, nil +} diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go new file mode 100644 index 00000000000..06387b46331 --- /dev/null +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go @@ -0,0 +1 @@ +package data_providers diff --git a/engine/access/rest/websockets/models/account_models.go b/engine/access/rest/websockets/models/account_models.go new file mode 100644 index 00000000000..712f7a1be6a --- /dev/null +++ b/engine/access/rest/websockets/models/account_models.go @@ -0,0 +1,11 @@ +package models + +import "github.com/onflow/flow-go/model/flow" + +// AccountStatusesResponse is the response message for 'events' topic. +type AccountStatusesResponse struct { + BlockID string `json:"blockID"` + Height string `json:"height"` + AccountEvents map[string]flow.EventsList `json:"account_events"` + MessageIndex string `json:"message_index"` +} From dca9a253070314e2eff5faeac9ac883569c3f6bb Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 29 Nov 2024 18:43:27 +0200 Subject: [PATCH 10/30] Fixed remarks --- .../data_providers/events_provider.go | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 94856bab0ec..9ce7b50f03f 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -29,10 +29,9 @@ type EventsArguments struct { // EventsDataProvider is responsible for providing events type EventsDataProvider struct { - *BaseDataProviderImpl + *baseDataProvider logger zerolog.Logger - args EventsArguments stateStreamApi state_stream.API } @@ -55,22 +54,18 @@ func NewEventsDataProvider( } // Initialize arguments passed to the provider. - var err error - p.args, err = ParseEventsArguments(arguments, chain, eventFilterConfig) + eventArgs, err := ParseEventsArguments(arguments, chain, eventFilterConfig) if err != nil { return nil, fmt.Errorf("invalid arguments for events data provider: %w", err) } subCtx, cancel := context.WithCancel(ctx) - // Set up a subscription to events based on arguments. - sub := p.createSubscription(subCtx) - - p.BaseDataProviderImpl = NewBaseDataProviderImpl( + p.baseDataProvider = newBaseDataProvider( topic, cancel, send, - sub, + p.createSubscription(subCtx, eventArgs), // Set up a subscription to events based on arguments. ) return p, nil @@ -84,16 +79,16 @@ func (p *EventsDataProvider) Run() error { } // createSubscription creates a new subscription using the specified input arguments. -func (p *EventsDataProvider) createSubscription(ctx context.Context) subscription.Subscription { - if p.args.StartBlockID != flow.ZeroID { - return p.stateStreamApi.SubscribeEventsFromStartBlockID(ctx, p.args.StartBlockID, p.args.Filter) +func (p *EventsDataProvider) createSubscription(ctx context.Context, args EventsArguments) subscription.Subscription { + if args.StartBlockID != flow.ZeroID { + return p.stateStreamApi.SubscribeEventsFromStartBlockID(ctx, args.StartBlockID, args.Filter) } - if p.args.StartBlockHeight != request.EmptyHeight { - return p.stateStreamApi.SubscribeEventsFromStartHeight(ctx, p.args.StartBlockHeight, p.args.Filter) + if args.StartBlockHeight != request.EmptyHeight { + return p.stateStreamApi.SubscribeEventsFromStartHeight(ctx, args.StartBlockHeight, args.Filter) } - return p.stateStreamApi.SubscribeEventsFromLatest(ctx, p.args.Filter) + return p.stateStreamApi.SubscribeEventsFromLatest(ctx, args.Filter) } // handleResponse processes an event and sends the formatted response. From 9624894bdfb252dc2031784ad99719af6b4c402e Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 29 Nov 2024 20:07:37 +0200 Subject: [PATCH 11/30] Added test for invalid arguments and for message index --- .../account_statuses_provider.go | 25 +-- .../account_statuses_provider_test.go | 180 ++++++++++++++++++ .../rest/websockets/data_providers/factory.go | 5 +- .../websockets/data_providers/factory_test.go | 11 ++ 4 files changed, 204 insertions(+), 17 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 52b45ace9f6..3e91abff03c 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -28,10 +28,9 @@ type AccountStatusesArguments struct { } type AccountStatusesDataProvider struct { - *BaseDataProviderImpl + *baseDataProvider logger zerolog.Logger - args AccountStatusesArguments stateStreamApi state_stream.API } @@ -53,22 +52,18 @@ func NewAccountStatusesDataProvider( stateStreamApi: stateStreamApi, } - var err error - p.args, err = ParseAccountStatusesArguments(arguments, chain, eventFilterConfig) + accountStatusesArgs, err := ParseAccountStatusesArguments(arguments, chain, eventFilterConfig) if err != nil { return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err) } subCtx, cancel := context.WithCancel(ctx) - // Set up a subscription to events based on arguments. - sub := p.createSubscription(subCtx) - - p.BaseDataProviderImpl = NewBaseDataProviderImpl( + p.baseDataProvider = newBaseDataProvider( topic, cancel, send, - sub, + p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to events based on arguments. ) return p, nil @@ -82,16 +77,16 @@ func (p *AccountStatusesDataProvider) Run() error { } // createSubscription creates a new subscription using the specified input arguments. -func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context) subscription.Subscription { - if p.args.StartBlockID != flow.ZeroID { - return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, p.args.StartBlockID, p.args.Filter) +func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, args AccountStatusesArguments) subscription.Subscription { + if args.StartBlockID != flow.ZeroID { + return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, args.StartBlockID, args.Filter) } - if p.args.StartBlockHeight != request.EmptyHeight { - return p.stateStreamApi.SubscribeAccountStatusesFromStartHeight(ctx, p.args.StartBlockHeight, p.args.Filter) + if args.StartBlockHeight != request.EmptyHeight { + return p.stateStreamApi.SubscribeAccountStatusesFromStartHeight(ctx, args.StartBlockHeight, args.Filter) } - return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, p.args.Filter) + return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter) } // handleResponse processes an account statuses and sends the formatted response. diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go index 06387b46331..0034270f75f 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go @@ -1 +1,181 @@ package data_providers + +import ( + "context" + "fmt" + "github.com/onflow/flow-go/engine/access/state_stream/backend" + "strconv" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/state_stream" + ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// AccountStatusesProviderSuite is a test suite for testing the account statuses providers functionality. +type AccountStatusesProviderSuite struct { + suite.Suite + + log zerolog.Logger + api *ssmock.API + + chain flow.Chain + rootBlock flow.Block + finalizedBlock *flow.Header +} + +func TestNewAccountStatusesDataProvider(t *testing.T) { + suite.Run(t, new(AccountStatusesProviderSuite)) +} + +func (s *AccountStatusesProviderSuite) SetupTest() { + s.log = unittest.Logger() + s.api = ssmock.NewAPI(s.T()) + + s.chain = flow.Testnet.Chain() + + s.rootBlock = unittest.BlockFixture() + s.rootBlock.Header.Height = 0 +} + +// invalidArgumentsTestCases returns a list of test cases with invalid argument combinations +// for testing the behavior of account statuses data providers. Each test case includes a name, +// a set of input arguments, and the expected error message that should be returned. +// +// The test cases cover scenarios such as: +// 1. Supplying both 'start_block_id' and 'start_block_height' simultaneously, which is not allowed. +// 2. Providing invalid 'start_block_id' value. +// 3. Providing invalid 'start_block_height' value. +func (s *AccountStatusesProviderSuite) invalidArgumentsTestCases() []testErrType { + return []testErrType{ + { + name: "provide both 'start_block_id' and 'start_block_height' arguments", + arguments: models.Arguments{ + "start_block_id": s.rootBlock.ID().String(), + "start_block_height": fmt.Sprintf("%d", s.rootBlock.Header.Height), + }, + expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'", + }, + { + name: "invalid 'start_block_id' argument", + arguments: map[string]string{ + "start_block_id": "invalid_block_id", + }, + expectedErrorMsg: "invalid ID format", + }, + { + name: "invalid 'start_block_height' argument", + arguments: map[string]string{ + "start_block_height": "-1", + }, + expectedErrorMsg: "value must be an unsigned 64 bit integer", + }, + } +} + +// TestAccountStatusesDataProvider_InvalidArguments tests the behavior of the account statuses data provider +// when invalid arguments are provided. It verifies that appropriate errors are returned +// for missing or conflicting arguments. +// This test covers the test cases: +// 1. Providing both 'start_block_id' and 'start_block_height' simultaneously. +// 2. Invalid 'start_block_id' argument. +// 3. Invalid 'start_block_height' argument. +func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidArguments() { + ctx := context.Background() + send := make(chan interface{}) + + topic := AccountStatusesTopic + + for _, test := range s.invalidArgumentsTestCases() { + s.Run(test.name, func() { + provider, err := NewAccountStatusesDataProvider( + ctx, + s.log, + s.api, + s.chain, + state_stream.DefaultEventFilterConfig, + topic, + test.arguments, + send) + s.Require().Nil(provider) + s.Require().Error(err) + s.Require().Contains(err.Error(), test.expectedErrorMsg) + }) + } +} + +// TestMessageIndexAccountStatusesProviderResponse_HappyPath tests that MessageIndex values in response are strictly increasing. +func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderResponse_HappyPath() { + ctx := context.Background() + send := make(chan interface{}, 10) + topic := AccountStatusesTopic + accountStatusesCount := 4 + + // Create a channel to simulate the subscription's account statuses channel + accountStatusesChan := make(chan interface{}) + + // Create a mock subscription and mock the channel + sub := ssmock.NewSubscription(s.T()) + sub.On("Channel").Return((<-chan interface{})(accountStatusesChan)) + sub.On("Err").Return(nil) + + s.api.On("SubscribeAccountStatusesFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub) + + arguments := + map[string]string{ + "start_block_id": s.rootBlock.ID().String(), + } + + // Create the AccountStatusesDataProvider instance + provider, err := NewAccountStatusesDataProvider( + ctx, + s.log, + s.api, + s.chain, + state_stream.DefaultEventFilterConfig, + topic, + arguments, + send) + s.Require().NotNil(provider) + s.Require().NoError(err) + + // Run the provider in a separate goroutine to simulate subscription processing + go func() { + err = provider.Run() + s.Require().NoError(err) + }() + + // Simulate emitting data to the account statuses channel + go func() { + defer close(accountStatusesChan) // Close the channel when done + + for i := 0; i < accountStatusesCount; i++ { + accountStatusesChan <- &backend.AccountStatusesResponse{} + } + }() + + // Collect responses + var responses []*models.AccountStatusesResponse + for i := 0; i < accountStatusesCount; i++ { + res := <-send + accountStatusesRes, ok := res.(*models.AccountStatusesResponse) + s.Require().True(ok, "Expected *models.AccountStatusesResponse, got %T", res) + responses = append(responses, accountStatusesRes) + } + + // Verifying that indices are strictly increasing + for i := 1; i < len(responses); i++ { + prevIndex, _ := strconv.Atoi(responses[i-1].MessageIndex) + currentIndex, _ := strconv.Atoi(responses[i].MessageIndex) + s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1") + } + + // Ensure the provider is properly closed after the test + provider.Close() +} diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 60af84784a4..6d1689c3977 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -97,9 +97,10 @@ func (s *DataProviderFactoryImpl) NewDataProvider( return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case EventsTopic: return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, s.chain, s.eventFilterConfig, topic, arguments, ch) + case AccountStatusesTopic: + return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, s.chain, s.eventFilterConfig, topic, arguments, ch) + case TransactionStatusesTopic: // TODO: Implemented handlers for each topic should be added in respective case - case AccountStatusesTopic, - TransactionStatusesTopic: return nil, fmt.Errorf(`topic "%s" not implemented yet`, topic) default: return nil, fmt.Errorf("unsupported topic \"%s\"", topic) diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 6d39d1c924c..d7be970772c 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -115,6 +115,17 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { s.stateStreamApi.AssertExpectations(s.T()) }, }, + { + name: "account statuses topic", + topic: AccountStatusesTopic, + arguments: models.Arguments{}, + setupSubscription: func() { + s.setupSubscription(s.stateStreamApi.On("SubscribeAccountStatusesFromLatestBlock", mock.Anything, mock.Anything)) + }, + assertExpectations: func() { + s.stateStreamApi.AssertExpectations(s.T()) + }, + }, } for _, test := range testCases { From 309b148bd60e5628ae08a710400d901aa3449e5f Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 2 Dec 2024 14:29:34 +0200 Subject: [PATCH 12/30] Added check for starting index value --- .../access/rest/websockets/data_providers/events_provider.go | 4 ++-- .../rest/websockets/data_providers/events_provider_test.go | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 9ce7b50f03f..581f6e79242 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -95,13 +95,13 @@ func (p *EventsDataProvider) createSubscription(ctx context.Context, args Events // // No errors are expected during normal operations. func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(*flow.Event) error { - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonousCounter(1) return func(event *flow.Event) error { + index := messageIndex.Value() if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) } - index := messageIndex.Value() send <- &models.EventResponse{ Event: event, diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 063d54b4452..d771f1fe58d 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -170,6 +170,9 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() responses = append(responses, eventRes) } + // Verifying that indices are starting from 1 + s.Require().Equal("1", responses[0].MessageIndex, "Expected MessageIndex to start with 1") + // Verifying that indices are strictly increasing for i := 1; i < len(responses); i++ { prevIndex, _ := strconv.Atoi(responses[i-1].MessageIndex) From 47a4c19470dcd23f16ce57c742a932eda6f76838 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 2 Dec 2024 15:10:49 +0200 Subject: [PATCH 13/30] Added check for msgIndex --- .../websockets/data_providers/account_statuses_provider.go | 4 ++-- .../data_providers/account_statuses_provider_test.go | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 3e91abff03c..1656fb415a7 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -93,13 +93,13 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar // // No errors are expected during normal operations. func (p *AccountStatusesDataProvider) handleResponse(send chan<- interface{}) func(accountStatusesResponse *backend.AccountStatusesResponse) error { - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonousCounter(1) return func(accountStatusesResponse *backend.AccountStatusesResponse) error { + index := messageIndex.Value() if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) } - index := messageIndex.Value() send <- &models.AccountStatusesResponse{ BlockID: accountStatusesResponse.BlockID.String(), diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go index 0034270f75f..b0d939ef8a8 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go @@ -169,6 +169,9 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe responses = append(responses, accountStatusesRes) } + // Verifying that indices are starting from 1 + s.Require().Equal("1", responses[0].MessageIndex, "Expected MessageIndex to start with 1") + // Verifying that indices are strictly increasing for i := 1; i < len(responses); i++ { prevIndex, _ := strconv.Atoi(responses[i-1].MessageIndex) From 8ffe02381327bc126e09d3291b24f16c84596e8c Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 2 Dec 2024 17:40:47 +0200 Subject: [PATCH 14/30] changed handleResponse to generic --- .../data_providers/blocks_provider_test.go | 3 +- .../data_providers/events_provider.go | 38 ++++++++----------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/blocks_provider_test.go b/engine/access/rest/websockets/data_providers/blocks_provider_test.go index 6f46d27ccfe..a886a1474f1 100644 --- a/engine/access/rest/websockets/data_providers/blocks_provider_test.go +++ b/engine/access/rest/websockets/data_providers/blocks_provider_test.go @@ -15,6 +15,7 @@ import ( accessmock "github.com/onflow/flow-go/access/mock" "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/state_stream" statestreamsmock "github.com/onflow/flow-go/engine/access/state_stream/mock" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" @@ -73,7 +74,7 @@ func (s *BlocksProviderSuite) SetupTest() { } s.finalizedBlock = parent - s.factory = NewDataProviderFactory(s.log, nil, s.api) + s.factory = NewDataProviderFactory(s.log, nil, s.api, flow.Testnet.Chain(), state_stream.DefaultEventFilterConfig) s.Require().NotNil(s.factory) } diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 581f6e79242..9c787338ebe 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -75,7 +75,22 @@ func NewEventsDataProvider( // // No errors are expected during normal operations. func (p *EventsDataProvider) Run() error { - return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send)) + messageIndex := counters.NewMonotonousCounter(1) + + return subscription.HandleSubscription( + p.subscription, + subscription.HandleResponse(p.send, func(event *flow.Event) (interface{}, error) { + index := messageIndex.Value() + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + + return &models.EventResponse{ + Event: event, + MessageIndex: strconv.FormatUint(index, 10), + }, nil + })) + } // createSubscription creates a new subscription using the specified input arguments. @@ -91,27 +106,6 @@ func (p *EventsDataProvider) createSubscription(ctx context.Context, args Events return p.stateStreamApi.SubscribeEventsFromLatest(ctx, args.Filter) } -// handleResponse processes an event and sends the formatted response. -// -// No errors are expected during normal operations. -func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(*flow.Event) error { - messageIndex := counters.NewMonotonousCounter(1) - - return func(event *flow.Event) error { - index := messageIndex.Value() - if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - - send <- &models.EventResponse{ - Event: event, - MessageIndex: strconv.FormatUint(index, 10), - } - - return nil - } -} - // ParseEventsArguments validates and initializes the events arguments. func ParseEventsArguments( arguments models.Arguments, From 13844055f7f76a6f7715c4f59291b249a13ec083 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 3 Dec 2024 19:25:03 +0200 Subject: [PATCH 15/30] Added happy path for testing all subscribe methods --- .../data_providers/events_provider_test.go | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index d771f1fe58d..48b967e4f40 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -3,8 +3,10 @@ package data_providers import ( "context" "fmt" + "github.com/stretchr/testify/require" "strconv" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -27,6 +29,8 @@ type EventsProviderSuite struct { chain flow.Chain rootBlock flow.Block finalizedBlock *flow.Header + + factory *DataProviderFactoryImpl } func TestEventsProviderSuite(t *testing.T) { @@ -41,6 +45,145 @@ func (s *EventsProviderSuite) SetupTest() { s.rootBlock = unittest.BlockFixture() s.rootBlock.Header.Height = 0 + + s.factory = NewDataProviderFactory(s.log, s.api, nil, flow.Testnet.Chain(), state_stream.DefaultEventFilterConfig) + s.Require().NotNil(s.factory) +} + +// subscribeEventsDataProviderTestCases generates test cases for events data providers. +func (s *EventsProviderSuite) subscribeEventsDataProviderTestCases() []testType { + return []testType{ + { + name: "SubscribeBlocksFromStartBlockID happy path", + arguments: models.Arguments{ + "start_block_id": s.rootBlock.ID().String(), + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeEventsFromStartBlockID", + mock.Anything, + s.rootBlock.ID(), + mock.Anything, + ).Return(sub).Once() + }, + }, + { + name: "SubscribeEventsFromStartHeight happy path", + arguments: models.Arguments{ + "start_block_height": strconv.FormatUint(s.rootBlock.Header.Height, 10), + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeEventsFromStartHeight", + mock.Anything, + s.rootBlock.Header.Height, + mock.Anything, + ).Return(sub).Once() + }, + }, + { + name: "SubscribeEventsFromLatest happy path", + arguments: models.Arguments{}, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeEventsFromLatest", + mock.Anything, + mock.Anything, + ).Return(sub).Once() + }, + }, + } +} + +// TestEventsDataProvider_HappyPath tests the behavior of the events data provider +// when it is configured correctly and operating under normal conditions. It +// validates that events are correctly streamed to the channel and ensures +// no unexpected errors occur. +func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() { + s.testHappyPath( + EventsTopic, + s.subscribeEventsDataProviderTestCases(), + s.requireEvents, + ) +} + +// testHappyPath tests a variety of scenarios for data providers in +// happy path scenarios. This function runs parameterized test cases that +// simulate various configurations and verifies that the data provider operates +// as expected without encountering errors. +// +// Arguments: +// - topic: The topic associated with the data provider. +// - tests: A slice of test cases to run, each specifying setup and validation logic. +// - requireFn: A function to validate the output received in the send channel. +func (s *EventsProviderSuite) testHappyPath( + topic string, + tests []testType, + requireFn func(interface{}, *flow.Event), +) { + expectedEvents := []flow.Event{ + unittest.EventFixture(flow.EventAccountCreated, 0, 0, unittest.IdentifierFixture(), 0), + unittest.EventFixture(flow.EventAccountUpdated, 0, 0, unittest.IdentifierFixture(), 0), + unittest.EventFixture(flow.EventAccountCreated, 0, 0, unittest.IdentifierFixture(), 0), + unittest.EventFixture(flow.EventAccountUpdated, 0, 0, unittest.IdentifierFixture(), 0), + } + + for _, test := range tests { + s.Run(test.name, func() { + ctx := context.Background() + send := make(chan interface{}, 10) + + // Create a channel to simulate the subscription's data channel + eventChan := make(chan interface{}) + + // // Create a mock subscription and mock the channel + sub := ssmock.NewSubscription(s.T()) + sub.On("Channel").Return((<-chan interface{})(eventChan)) + sub.On("Err").Return(nil) + test.setupBackend(sub) + + // Create the data provider instance + provider, err := s.factory.NewDataProvider(ctx, topic, test.arguments, send) + s.Require().NotNil(provider) + s.Require().NoError(err) + + // Run the provider in a separate goroutine + go func() { + err = provider.Run() + s.Require().NoError(err) + }() + + // Simulate emitting data to the events channel + go func() { + defer close(eventChan) + + for i := 0; i < len(expectedEvents); i++ { + eventChan <- &expectedEvents[i] + } + }() + + // Collect responses + for _, e := range expectedEvents { + unittest.RequireReturnsBefore(s.T(), func() { + v, ok := <-send + s.Require().True(ok, "channel closed while waiting for event %v: err: %v", e.ID(), sub.Err()) + + requireFn(v, &e) + }, time.Second, fmt.Sprintf("timed out waiting for event %v ", e.ID())) + } + + // Ensure the provider is properly closed after the test + provider.Close() + }) + } +} + +// requireEvents ensures that the received event information matches the expected data. +func (s *EventsProviderSuite) requireEvents(v interface{}, expectedEvent *flow.Event) { + actualResponse, ok := v.(*models.EventResponse) + require.True(s.T(), ok, "Expected *models.EventResponse, got %T", v) + + s.Require().Equal(expectedEvent, actualResponse.Event) } // invalidArgumentsTestCases returns a list of test cases with invalid argument combinations From 57a7c0f37af43cec6f6379a3ee480c760639d9f6 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 3 Dec 2024 20:45:29 +0200 Subject: [PATCH 16/30] Linted --- .../rest/websockets/data_providers/events_provider_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 48b967e4f40..66e1b05f3bd 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -3,13 +3,13 @@ package data_providers import ( "context" "fmt" - "github.com/stretchr/testify/require" "strconv" "testing" "time" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/onflow/flow-go/engine/access/rest/websockets/models" From e076072a4469e51b8bddc029d8111e6d673ea8c5 Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 4 Dec 2024 12:41:43 +0200 Subject: [PATCH 17/30] Changed order of params --- .../access/rest/websockets/data_providers/events_provider.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 9c787338ebe..71a7ef7e67c 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -42,11 +42,11 @@ func NewEventsDataProvider( ctx context.Context, logger zerolog.Logger, stateStreamApi state_stream.API, - chain flow.Chain, - eventFilterConfig state_stream.EventFilterConfig, topic string, arguments models.Arguments, send chan<- interface{}, + chain flow.Chain, + eventFilterConfig state_stream.EventFilterConfig, ) (*EventsDataProvider, error) { p := &EventsDataProvider{ logger: logger.With().Str("component", "events-data-provider").Logger(), From 89bc4c23ef9de2747beff8e69686928d94b7ed1b Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 4 Dec 2024 12:46:20 +0200 Subject: [PATCH 18/30] Fixed issues with params order --- .../data_providers/events_provider_test.go | 12 ++++++------ .../access/rest/websockets/data_providers/factory.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 66e1b05f3bd..7e5973be34a 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -240,11 +240,11 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { ctx, s.log, s.api, - s.chain, - state_stream.DefaultEventFilterConfig, topic, test.arguments, - send) + send, + s.chain, + state_stream.DefaultEventFilterConfig) s.Require().Nil(provider) s.Require().Error(err) s.Require().Contains(err.Error(), test.expectedErrorMsg) @@ -279,11 +279,11 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() ctx, s.log, s.api, - s.chain, - state_stream.DefaultEventFilterConfig, topic, arguments, - send) + send, + s.chain, + state_stream.DefaultEventFilterConfig) s.Require().NotNil(provider) s.Require().NoError(err) diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 60af84784a4..fc764e4c7fe 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -96,7 +96,7 @@ func (s *DataProviderFactoryImpl) NewDataProvider( case BlockDigestsTopic: return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case EventsTopic: - return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, s.chain, s.eventFilterConfig, topic, arguments, ch) + return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig) // TODO: Implemented handlers for each topic should be added in respective case case AccountStatusesTopic, TransactionStatusesTopic: From a1d7aa7c5bfc89c50490ae25e36f94f4acf61ce0 Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 4 Dec 2024 12:49:59 +0200 Subject: [PATCH 19/30] Refactored parse function --- .../rest/websockets/data_providers/events_provider.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 71a7ef7e67c..f85430aaa7c 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -115,8 +115,8 @@ func ParseEventsArguments( var args EventsArguments // Check for mutual exclusivity of start_block_id and start_block_height early - _, hasStartBlockID := arguments["start_block_id"] - _, hasStartBlockHeight := arguments["start_block_height"] + startBlockIDIn, hasStartBlockID := arguments["start_block_id"] + startBlockHeightIn, hasStartBlockHeight := arguments["start_block_height"] if hasStartBlockID && hasStartBlockHeight { return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'") @@ -125,7 +125,7 @@ func ParseEventsArguments( // Parse 'start_block_id' if provided if hasStartBlockID { var startBlockID parser.ID - err := startBlockID.Parse(arguments["start_block_id"]) + err := startBlockID.Parse(startBlockIDIn) if err != nil { return args, fmt.Errorf("invalid 'start_block_id': %w", err) } @@ -135,7 +135,7 @@ func ParseEventsArguments( // Parse 'start_block_height' if provided if hasStartBlockHeight { var err error - args.StartBlockHeight, err = util.ToUint64(arguments["start_block_height"]) + args.StartBlockHeight, err = util.ToUint64(startBlockHeightIn) if err != nil { return args, fmt.Errorf("invalid 'start_block_height': %w", err) } From 8d9e0902d25b72b80ec304b8fb7eac28139f6671 Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 4 Dec 2024 13:19:56 +0200 Subject: [PATCH 20/30] Using json arrays instead of comma separeted lists --- .../data_providers/events_provider.go | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index f85430aaa7c..f1b3bc7876b 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -2,6 +2,7 @@ package data_providers import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -143,10 +144,15 @@ func ParseEventsArguments( args.StartBlockHeight = request.EmptyHeight } - // Parse 'event_types' as []string{} var eventTypes parser.EventTypes + // Parse 'event_types' as []string{} if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { - err := eventTypes.Parse(strings.Split(eventTypesIn, ",")) + err := json.Unmarshal([]byte(eventTypesIn), &eventTypes) // Expect a JSON array + if err != nil { + return args, fmt.Errorf("could not parse 'event_types': %w", err) + } + + err = eventTypes.Parse(strings.Split(eventTypesIn, ",")) if err != nil { return args, fmt.Errorf("invalid 'event_types': %w", err) } @@ -155,13 +161,19 @@ func ParseEventsArguments( // Parse 'addresses' as []string{} var addresses []string if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" { - addresses = strings.Split(addressesIn, ",") + err := json.Unmarshal([]byte(addressesIn), &addresses) // Expect a JSON array + if err != nil { + return args, fmt.Errorf("could not parse 'addresses': %w", err) + } } // Parse 'contracts' as []string{} var contracts []string if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" { - contracts = strings.Split(contractsIn, ",") + err := json.Unmarshal([]byte(contractsIn), &contracts) // Expect a JSON array + if err != nil { + return args, fmt.Errorf("could not parse 'contracts': %w", err) + } } // Initialize the event filter with the parsed arguments From 416ff58d728c22755824602737d5c235a34057dd Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 4 Dec 2024 19:28:18 +0200 Subject: [PATCH 21/30] Added heartbeat handling in handleResponse, updated type of expected response, updated tests --- engine/access/rest/server.go | 8 ++- .../data_providers/blocks_provider_test.go | 8 ++- .../data_providers/events_provider.go | 48 ++++++++++++----- .../data_providers/events_provider_test.go | 53 +++++++++++++------ .../rest/websockets/data_providers/factory.go | 5 +- .../websockets/data_providers/factory_test.go | 9 +++- .../rest/websockets/models/event_models.go | 13 +++-- 7 files changed, 107 insertions(+), 37 deletions(-) diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index 71304db22a1..c45919725b2 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -51,7 +51,13 @@ func NewServer(serverAPI access.API, builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize) } - dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI, chain, stateStreamConfig.EventFilterConfig) + dataProviderFactory := dp.NewDataProviderFactory( + logger, + stateStreamApi, + serverAPI, + chain, + stateStreamConfig.EventFilterConfig, + stateStreamConfig.HeartbeatInterval) builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory) c := cors.New(cors.Options{ diff --git a/engine/access/rest/websockets/data_providers/blocks_provider_test.go b/engine/access/rest/websockets/data_providers/blocks_provider_test.go index 23523f647ee..c82ec6d47d0 100644 --- a/engine/access/rest/websockets/data_providers/blocks_provider_test.go +++ b/engine/access/rest/websockets/data_providers/blocks_provider_test.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" statestreamsmock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" ) @@ -74,7 +75,12 @@ func (s *BlocksProviderSuite) SetupTest() { } s.finalizedBlock = parent - s.factory = NewDataProviderFactory(s.log, nil, s.api, flow.Testnet.Chain(), state_stream.DefaultEventFilterConfig) + s.factory = NewDataProviderFactory(s.log, + nil, + s.api, + flow.Testnet.Chain(), + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) s.Require().NotNil(s.factory) } diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index f1b3bc7876b..822b22dad3b 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/engine/access/rest/util" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/counters" @@ -34,6 +35,8 @@ type EventsDataProvider struct { logger zerolog.Logger stateStreamApi state_stream.API + + heartbeatInterval uint64 } var _ DataProvider = (*EventsDataProvider)(nil) @@ -48,10 +51,12 @@ func NewEventsDataProvider( send chan<- interface{}, chain flow.Chain, eventFilterConfig state_stream.EventFilterConfig, + heartbeatInterval uint64, ) (*EventsDataProvider, error) { p := &EventsDataProvider{ - logger: logger.With().Str("component", "events-data-provider").Logger(), - stateStreamApi: stateStreamApi, + logger: logger.With().Str("component", "events-data-provider").Logger(), + stateStreamApi: stateStreamApi, + heartbeatInterval: heartbeatInterval, } // Initialize arguments passed to the provider. @@ -76,22 +81,39 @@ func NewEventsDataProvider( // // No errors are expected during normal operations. func (p *EventsDataProvider) Run() error { + return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send)) +} + +func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(eventsResponse *backend.EventsResponse) error { + blocksSinceLastMessage := uint64(0) messageIndex := counters.NewMonotonousCounter(1) - return subscription.HandleSubscription( - p.subscription, - subscription.HandleResponse(p.send, func(event *flow.Event) (interface{}, error) { - index := messageIndex.Value() - if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { - return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + return func(eventsResponse *backend.EventsResponse) error { + // check if there are any events in the response. if not, do not send a message unless the last + // response was more than HeartbeatInterval blocks ago + if len(eventsResponse.Events) == 0 { + blocksSinceLastMessage++ + if blocksSinceLastMessage < p.heartbeatInterval { + return nil } + blocksSinceLastMessage = 0 + } - return &models.EventResponse{ - Event: event, - MessageIndex: strconv.FormatUint(index, 10), - }, nil - })) + index := messageIndex.Value() + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + + send <- &models.EventResponse{ + BlockId: eventsResponse.BlockID.String(), + BlockHeight: strconv.FormatUint(eventsResponse.Height, 10), + BlockTimestamp: eventsResponse.BlockTimestamp, + Events: eventsResponse.Events, + MessageIndex: strconv.FormatUint(index, 10), + } + return nil + } } // createSubscription creates a new subscription using the specified input arguments. diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 7e5973be34a..fe7ccba6011 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -5,7 +5,6 @@ import ( "fmt" "strconv" "testing" - "time" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -14,7 +13,9 @@ import ( "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/engine/access/state_stream/backend" ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" ) @@ -46,7 +47,13 @@ func (s *EventsProviderSuite) SetupTest() { s.rootBlock = unittest.BlockFixture() s.rootBlock.Header.Height = 0 - s.factory = NewDataProviderFactory(s.log, s.api, nil, flow.Testnet.Chain(), state_stream.DefaultEventFilterConfig) + s.factory = NewDataProviderFactory( + s.log, + s.api, + nil, + flow.Testnet.Chain(), + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) s.Require().NotNil(s.factory) } @@ -119,7 +126,7 @@ func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() { func (s *EventsProviderSuite) testHappyPath( topic string, tests []testType, - requireFn func(interface{}, *flow.Event), + requireFn func(interface{}, *backend.EventsResponse), ) { expectedEvents := []flow.Event{ unittest.EventFixture(flow.EventAccountCreated, 0, 0, unittest.IdentifierFixture(), 0), @@ -128,6 +135,18 @@ func (s *EventsProviderSuite) testHappyPath( unittest.EventFixture(flow.EventAccountUpdated, 0, 0, unittest.IdentifierFixture(), 0), } + var expectedEventsResponses []backend.EventsResponse + + for i := 0; i < len(expectedEvents); i++ { + expectedEventsResponses = append(expectedEventsResponses, backend.EventsResponse{ + Height: s.rootBlock.Header.Height, + BlockID: s.rootBlock.ID(), + Events: expectedEvents, + BlockTimestamp: s.rootBlock.Header.Timestamp, + }) + + } + for _, test := range tests { s.Run(test.name, func() { ctx := context.Background() @@ -157,19 +176,17 @@ func (s *EventsProviderSuite) testHappyPath( go func() { defer close(eventChan) - for i := 0; i < len(expectedEvents); i++ { - eventChan <- &expectedEvents[i] + for i := 0; i < len(expectedEventsResponses); i++ { + eventChan <- &expectedEventsResponses[i] } }() // Collect responses - for _, e := range expectedEvents { - unittest.RequireReturnsBefore(s.T(), func() { - v, ok := <-send - s.Require().True(ok, "channel closed while waiting for event %v: err: %v", e.ID(), sub.Err()) + for _, e := range expectedEventsResponses { + v, ok := <-send + s.Require().True(ok, "channel closed while waiting for event %v: err: %v", e.BlockID, sub.Err()) - requireFn(v, &e) - }, time.Second, fmt.Sprintf("timed out waiting for event %v ", e.ID())) + requireFn(v, &e) } // Ensure the provider is properly closed after the test @@ -179,11 +196,11 @@ func (s *EventsProviderSuite) testHappyPath( } // requireEvents ensures that the received event information matches the expected data. -func (s *EventsProviderSuite) requireEvents(v interface{}, expectedEvent *flow.Event) { +func (s *EventsProviderSuite) requireEvents(v interface{}, expectedEventsResponse *backend.EventsResponse) { actualResponse, ok := v.(*models.EventResponse) require.True(s.T(), ok, "Expected *models.EventResponse, got %T", v) - s.Require().Equal(expectedEvent, actualResponse.Event) + s.Require().ElementsMatch(expectedEventsResponse.Events, actualResponse.Events) } // invalidArgumentsTestCases returns a list of test cases with invalid argument combinations @@ -244,7 +261,8 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { test.arguments, send, s.chain, - state_stream.DefaultEventFilterConfig) + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) s.Require().Nil(provider) s.Require().Error(err) s.Require().Contains(err.Error(), test.expectedErrorMsg) @@ -283,7 +301,8 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() arguments, send, s.chain, - state_stream.DefaultEventFilterConfig) + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) s.Require().NotNil(provider) s.Require().NoError(err) @@ -298,8 +317,8 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() defer close(eventChan) // Close the channel when done for i := 0; i < eventsCount; i++ { - eventChan <- &flow.Event{ - Type: "flow.AccountCreated", + eventChan <- &backend.EventsResponse{ + Height: s.rootBlock.Header.Height, } } }() diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index fc764e4c7fe..e39a6643d4e 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -47,6 +47,7 @@ type DataProviderFactoryImpl struct { chain flow.Chain eventFilterConfig state_stream.EventFilterConfig + heartbeatInterval uint64 } // NewDataProviderFactory creates a new DataProviderFactory @@ -62,6 +63,7 @@ func NewDataProviderFactory( accessApi access.API, chain flow.Chain, eventFilterConfig state_stream.EventFilterConfig, + heartbeatInterval uint64, ) *DataProviderFactoryImpl { return &DataProviderFactoryImpl{ logger: logger, @@ -69,6 +71,7 @@ func NewDataProviderFactory( accessApi: accessApi, chain: chain, eventFilterConfig: eventFilterConfig, + heartbeatInterval: heartbeatInterval, } } @@ -96,7 +99,7 @@ func (s *DataProviderFactoryImpl) NewDataProvider( case BlockDigestsTopic: return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case EventsTopic: - return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig) + return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) // TODO: Implemented handlers for each topic should be added in respective case case AccountStatusesTopic, TransactionStatusesTopic: diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 9ea21f70270..38618a5c29a 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" statestreammock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" ) @@ -46,7 +47,13 @@ func (s *DataProviderFactorySuite) SetupTest() { chain := flow.Testnet.Chain() - s.factory = NewDataProviderFactory(log, s.stateStreamApi, s.accessApi, chain, state_stream.DefaultEventFilterConfig) + s.factory = NewDataProviderFactory( + log, + s.stateStreamApi, + s.accessApi, + chain, + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) s.Require().NotNil(s.factory) } diff --git a/engine/access/rest/websockets/models/event_models.go b/engine/access/rest/websockets/models/event_models.go index 0569ebaf4ae..48d085d9b85 100644 --- a/engine/access/rest/websockets/models/event_models.go +++ b/engine/access/rest/websockets/models/event_models.go @@ -1,9 +1,16 @@ package models -import "github.com/onflow/flow-go/model/flow" +import ( + "time" + + "github.com/onflow/flow-go/model/flow" +) // EventResponse is the response message for 'events' topic. type EventResponse struct { - Event *flow.Event `json:"event"` - MessageIndex string `json:"message_index"` + BlockId string `json:"block_id"` + BlockHeight string `json:"block_height"` + BlockTimestamp time.Time `json:"block_timestamp"` + Events []flow.Event `json:"events"` + MessageIndex string `json:"message_index"` } From 179a6646c1f821aaf39a9aa4ed4abbfa5a32bd59 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 5 Dec 2024 15:38:42 +0200 Subject: [PATCH 22/30] Fixed small remarks --- .../websockets/data_providers/blocks_provider_test.go | 3 ++- .../rest/websockets/data_providers/events_provider.go | 11 ++++------- .../websockets/data_providers/events_provider_test.go | 3 ++- .../rest/websockets/data_providers/factory_test.go | 3 ++- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/blocks_provider_test.go b/engine/access/rest/websockets/data_providers/blocks_provider_test.go index c82ec6d47d0..51cf9e63e44 100644 --- a/engine/access/rest/websockets/data_providers/blocks_provider_test.go +++ b/engine/access/rest/websockets/data_providers/blocks_provider_test.go @@ -75,7 +75,8 @@ func (s *BlocksProviderSuite) SetupTest() { } s.finalizedBlock = parent - s.factory = NewDataProviderFactory(s.log, + s.factory = NewDataProviderFactory( + s.log, nil, s.api, flow.Testnet.Chain(), diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 822b22dad3b..6305d614bde 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -8,8 +8,6 @@ import ( "strings" "github.com/rs/zerolog" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/http/request" @@ -81,10 +79,10 @@ func NewEventsDataProvider( // // No errors are expected during normal operations. func (p *EventsDataProvider) Run() error { - return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send)) + return subscription.HandleSubscription(p.subscription, p.handleResponse()) } -func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(eventsResponse *backend.EventsResponse) error { +func (p *EventsDataProvider) handleResponse() func(eventsResponse *backend.EventsResponse) error { blocksSinceLastMessage := uint64(0) messageIndex := counters.NewMonotonousCounter(1) @@ -101,10 +99,10 @@ func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(events index := messageIndex.Value() if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + return fmt.Errorf("message index already incremented to: %d", messageIndex.Value()) } - send <- &models.EventResponse{ + p.send <- &models.EventResponse{ BlockId: eventsResponse.BlockID.String(), BlockHeight: strconv.FormatUint(eventsResponse.Height, 10), BlockTimestamp: eventsResponse.BlockTimestamp, @@ -167,7 +165,6 @@ func ParseEventsArguments( } var eventTypes parser.EventTypes - // Parse 'event_types' as []string{} if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { err := json.Unmarshal([]byte(eventTypesIn), &eventTypes) // Expect a JSON array if err != nil { diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index fe7ccba6011..bc928a601cf 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -262,7 +262,8 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { send, s.chain, state_stream.DefaultEventFilterConfig, - subscription.DefaultHeartbeatInterval) + subscription.DefaultHeartbeatInterval, + ) s.Require().Nil(provider) s.Require().Error(err) s.Require().Contains(err.Error(), test.expectedErrorMsg) diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 38618a5c29a..3323e3cc258 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -53,7 +53,8 @@ func (s *DataProviderFactorySuite) SetupTest() { s.accessApi, chain, state_stream.DefaultEventFilterConfig, - subscription.DefaultHeartbeatInterval) + subscription.DefaultHeartbeatInterval, + ) s.Require().NotNil(s.factory) } From e0aa808ee8fff8c180ba94c8be418cbc7442af0e Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 5 Dec 2024 15:43:44 +0200 Subject: [PATCH 23/30] Made parse function private --- .../rest/websockets/data_providers/events_provider.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 6305d614bde..acd719f4f06 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -58,7 +58,7 @@ func NewEventsDataProvider( } // Initialize arguments passed to the provider. - eventArgs, err := ParseEventsArguments(arguments, chain, eventFilterConfig) + eventArgs, err := parseEventsArguments(arguments, chain, eventFilterConfig) if err != nil { return nil, fmt.Errorf("invalid arguments for events data provider: %w", err) } @@ -127,8 +127,8 @@ func (p *EventsDataProvider) createSubscription(ctx context.Context, args Events return p.stateStreamApi.SubscribeEventsFromLatest(ctx, args.Filter) } -// ParseEventsArguments validates and initializes the events arguments. -func ParseEventsArguments( +// parseEventsArguments validates and initializes the events arguments. +func parseEventsArguments( arguments models.Arguments, chain flow.Chain, eventFilterConfig state_stream.EventFilterConfig, From 19bf3c9b9c1580bf06bd71d153d45af8cbac5711 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 5 Dec 2024 19:16:05 +0200 Subject: [PATCH 24/30] Changed Arguments type and refactored code --- .../data_providers/blocks_provider.go | 23 ++++++++---- .../data_providers/events_provider.go | 35 +++++++++++++++---- .../data_providers/events_provider_test.go | 6 ++-- .../rest/websockets/models/subscribe.go | 4 ++- 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/blocks_provider.go b/engine/access/rest/websockets/data_providers/blocks_provider.go index 72cfaa6f554..28e0a9a03d2 100644 --- a/engine/access/rest/websockets/data_providers/blocks_provider.go +++ b/engine/access/rest/websockets/data_providers/blocks_provider.go @@ -96,7 +96,11 @@ func ParseBlocksArguments(arguments models.Arguments) (BlocksArguments, error) { // Parse 'block_status' if blockStatusIn, ok := arguments["block_status"]; ok { - blockStatus, err := parser.ParseBlockStatus(blockStatusIn) + result, ok := blockStatusIn.(string) + if !ok { + return args, fmt.Errorf("'block_status' must be string") + } + blockStatus, err := parser.ParseBlockStatus(result) if err != nil { return args, err } @@ -113,24 +117,31 @@ func ParseBlocksArguments(arguments models.Arguments) (BlocksArguments, error) { return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'") } - // Parse 'start_block_id' if provided if hasStartBlockID { + result, ok := startBlockIDIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_id' must be a string") + } var startBlockID parser.ID - err := startBlockID.Parse(startBlockIDIn) + err := startBlockID.Parse(result) if err != nil { return args, err } args.StartBlockID = startBlockID.Flow() } - // Parse 'start_block_height' if provided if hasStartBlockHeight { - var err error - args.StartBlockHeight, err = util.ToUint64(startBlockHeightIn) + result, ok := startBlockHeightIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_height' must be a string") + } + startBlockHeight, err := util.ToUint64(result) if err != nil { return args, fmt.Errorf("invalid 'start_block_height': %w", err) } + args.StartBlockHeight = startBlockHeight } else { + // Default value if 'start_block_height' is not provided args.StartBlockHeight = request.EmptyHeight } diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index acd719f4f06..6edf203bf2a 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -145,8 +145,12 @@ func parseEventsArguments( // Parse 'start_block_id' if provided if hasStartBlockID { + result, ok := startBlockIDIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_id' must be a string") + } var startBlockID parser.ID - err := startBlockID.Parse(startBlockIDIn) + err := startBlockID.Parse(result) if err != nil { return args, fmt.Errorf("invalid 'start_block_id': %w", err) } @@ -155,23 +159,32 @@ func parseEventsArguments( // Parse 'start_block_height' if provided if hasStartBlockHeight { - var err error - args.StartBlockHeight, err = util.ToUint64(startBlockHeightIn) + result, ok := startBlockHeightIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_height' must be a string") + } + startBlockHeight, err := util.ToUint64(result) if err != nil { return args, fmt.Errorf("invalid 'start_block_height': %w", err) } + args.StartBlockHeight = startBlockHeight } else { args.StartBlockHeight = request.EmptyHeight } + // Parse 'event_types' as a JSON array var eventTypes parser.EventTypes if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { - err := json.Unmarshal([]byte(eventTypesIn), &eventTypes) // Expect a JSON array + result, ok := eventTypesIn.(string) + if !ok { + return args, fmt.Errorf("'event_types' must be a string") + } + err := json.Unmarshal([]byte(result), &eventTypes) // Expect a JSON array if err != nil { return args, fmt.Errorf("could not parse 'event_types': %w", err) } - err = eventTypes.Parse(strings.Split(eventTypesIn, ",")) + err = eventTypes.Parse(strings.Split(result, ",")) if err != nil { return args, fmt.Errorf("invalid 'event_types': %w", err) } @@ -180,7 +193,11 @@ func parseEventsArguments( // Parse 'addresses' as []string{} var addresses []string if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" { - err := json.Unmarshal([]byte(addressesIn), &addresses) // Expect a JSON array + result, ok := addressesIn.(string) + if !ok { + return args, fmt.Errorf("'addresses' must be a string") + } + err := json.Unmarshal([]byte(result), &addresses) // Expect a JSON array if err != nil { return args, fmt.Errorf("could not parse 'addresses': %w", err) } @@ -189,7 +206,11 @@ func parseEventsArguments( // Parse 'contracts' as []string{} var contracts []string if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" { - err := json.Unmarshal([]byte(contractsIn), &contracts) // Expect a JSON array + result, ok := contractsIn.(string) + if !ok { + return args, fmt.Errorf("'contracts' must be a string") + } + err := json.Unmarshal([]byte(result), &contracts) // Expect a JSON array if err != nil { return args, fmt.Errorf("could not parse 'contracts': %w", err) } diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index bc928a601cf..8721f407946 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -223,14 +223,14 @@ func (s *EventsProviderSuite) invalidArgumentsTestCases() []testErrType { }, { name: "invalid 'start_block_id' argument", - arguments: map[string]string{ + arguments: map[string]interface{}{ "start_block_id": "invalid_block_id", }, expectedErrorMsg: "invalid ID format", }, { name: "invalid 'start_block_height' argument", - arguments: map[string]string{ + arguments: map[string]interface{}{ "start_block_height": "-1", }, expectedErrorMsg: "value must be an unsigned 64 bit integer", @@ -289,7 +289,7 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() s.api.On("SubscribeEventsFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub) arguments := - map[string]string{ + map[string]interface{}{ "start_block_id": s.rootBlock.ID().String(), } diff --git a/engine/access/rest/websockets/models/subscribe.go b/engine/access/rest/websockets/models/subscribe.go index 95ad17e3708..85d28f6d8ab 100644 --- a/engine/access/rest/websockets/models/subscribe.go +++ b/engine/access/rest/websockets/models/subscribe.go @@ -1,6 +1,8 @@ package models -type Arguments map[string]string +//type Arguments map[string]string + +type Arguments map[string]interface{} // SubscribeMessageRequest represents a request to subscribe to a topic. type SubscribeMessageRequest struct { From 8d0543d6e8d69cd4058028741aecae2d8acc4643 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 5 Dec 2024 19:16:30 +0200 Subject: [PATCH 25/30] Removed comment --- engine/access/rest/websockets/models/subscribe.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/engine/access/rest/websockets/models/subscribe.go b/engine/access/rest/websockets/models/subscribe.go index 85d28f6d8ab..03b37aee5f1 100644 --- a/engine/access/rest/websockets/models/subscribe.go +++ b/engine/access/rest/websockets/models/subscribe.go @@ -1,7 +1,5 @@ package models -//type Arguments map[string]string - type Arguments map[string]interface{} // SubscribeMessageRequest represents a request to subscribe to a topic. From d6fc9dfe6aff11f655dea036ad495f8b138f8985 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 6 Dec 2024 15:56:02 +0200 Subject: [PATCH 26/30] Fixed parse function for event privoder --- .../data_providers/events_provider.go | 27 +++++-------------- .../data_providers/events_provider_test.go | 1 + 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 6edf203bf2a..dfbeb5c7c44 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -2,12 +2,9 @@ package data_providers import ( "context" - "encoding/json" "fmt" - "strconv" - "strings" - "github.com/rs/zerolog" + "strconv" "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/http/request" @@ -175,16 +172,12 @@ func parseEventsArguments( // Parse 'event_types' as a JSON array var eventTypes parser.EventTypes if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { - result, ok := eventTypesIn.(string) + result, ok := eventTypesIn.([]string) if !ok { - return args, fmt.Errorf("'event_types' must be a string") - } - err := json.Unmarshal([]byte(result), &eventTypes) // Expect a JSON array - if err != nil { - return args, fmt.Errorf("could not parse 'event_types': %w", err) + return args, fmt.Errorf("'event_types' must be an array of string") } - err = eventTypes.Parse(strings.Split(result, ",")) + err := eventTypes.Parse(result) if err != nil { return args, fmt.Errorf("invalid 'event_types': %w", err) } @@ -193,27 +186,19 @@ func parseEventsArguments( // Parse 'addresses' as []string{} var addresses []string if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" { - result, ok := addressesIn.(string) + addresses, ok = addressesIn.([]string) if !ok { return args, fmt.Errorf("'addresses' must be a string") } - err := json.Unmarshal([]byte(result), &addresses) // Expect a JSON array - if err != nil { - return args, fmt.Errorf("could not parse 'addresses': %w", err) - } } // Parse 'contracts' as []string{} var contracts []string if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" { - result, ok := contractsIn.(string) + contracts, ok = contractsIn.([]string) if !ok { return args, fmt.Errorf("'contracts' must be a string") } - err := json.Unmarshal([]byte(result), &contracts) // Expect a JSON array - if err != nil { - return args, fmt.Errorf("could not parse 'contracts': %w", err) - } } // Initialize the event filter with the parsed arguments diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 8721f407946..c1728eef4a9 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -64,6 +64,7 @@ func (s *EventsProviderSuite) subscribeEventsDataProviderTestCases() []testType name: "SubscribeBlocksFromStartBlockID happy path", arguments: models.Arguments{ "start_block_id": s.rootBlock.ID().String(), + "event_types": []string{"flow.AccountCreated", "flow.AccountUpdated"}, }, setupBackend: func(sub *ssmock.Subscription) { s.api.On( From 1bb4112033e66943fe4d8efcb706bea51f805543 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 9 Dec 2024 12:20:20 +0200 Subject: [PATCH 27/30] Linted --- .../access/rest/websockets/data_providers/events_provider.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index dfbeb5c7c44..9431ca9a4dc 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -3,9 +3,10 @@ package data_providers import ( "context" "fmt" - "github.com/rs/zerolog" "strconv" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/http/request" "github.com/onflow/flow-go/engine/access/rest/util" From f81c794c176e09ed9ab40bf4bc2f5aa8156ba1ef Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 9 Dec 2024 13:28:14 +0200 Subject: [PATCH 28/30] Changed parse args function, added hearbeat for hadnling --- .../account_statuses_provider.go | 73 +++++++++++++------ .../data_providers/events_provider.go | 7 +- 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 1656fb415a7..8261e77a1ef 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -3,12 +3,10 @@ package data_providers import ( "context" "fmt" - "strconv" - "strings" - "github.com/rs/zerolog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "strconv" "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/http/request" @@ -32,6 +30,8 @@ type AccountStatusesDataProvider struct { logger zerolog.Logger stateStreamApi state_stream.API + + heartbeatInterval uint64 } var _ DataProvider = (*AccountStatusesDataProvider)(nil) @@ -41,17 +41,20 @@ func NewAccountStatusesDataProvider( ctx context.Context, logger zerolog.Logger, stateStreamApi state_stream.API, - chain flow.Chain, - eventFilterConfig state_stream.EventFilterConfig, topic string, arguments models.Arguments, send chan<- interface{}, + chain flow.Chain, + eventFilterConfig state_stream.EventFilterConfig, + heartbeatInterval uint64, ) (*AccountStatusesDataProvider, error) { p := &AccountStatusesDataProvider{ - logger: logger.With().Str("component", "account-statuses-data-provider").Logger(), - stateStreamApi: stateStreamApi, + logger: logger.With().Str("component", "account-statuses-data-provider").Logger(), + stateStreamApi: stateStreamApi, + heartbeatInterval: heartbeatInterval, } + // Initialize arguments passed to the provider. accountStatusesArgs, err := ParseAccountStatusesArguments(arguments, chain, eventFilterConfig) if err != nil { return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err) @@ -73,7 +76,7 @@ func NewAccountStatusesDataProvider( // // No errors are expected during normal operations. func (p *AccountStatusesDataProvider) Run() error { - return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send)) + return subscription.HandleSubscription(p.subscription, p.handleResponse()) } // createSubscription creates a new subscription using the specified input arguments. @@ -92,16 +95,27 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar // handleResponse processes an account statuses and sends the formatted response. // // No errors are expected during normal operations. -func (p *AccountStatusesDataProvider) handleResponse(send chan<- interface{}) func(accountStatusesResponse *backend.AccountStatusesResponse) error { +func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error { + blocksSinceLastMessage := uint64(0) messageIndex := counters.NewMonotonousCounter(1) return func(accountStatusesResponse *backend.AccountStatusesResponse) error { + // check if there are any events in the response. if not, do not send a message unless the last + // response was more than HeartbeatInterval blocks ago + if len(accountStatusesResponse.AccountEvents) == 0 { + blocksSinceLastMessage++ + if blocksSinceLastMessage < p.heartbeatInterval { + return nil + } + blocksSinceLastMessage = 0 + } + index := messageIndex.Value() if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) } - send <- &models.AccountStatusesResponse{ + p.send <- &models.AccountStatusesResponse{ BlockID: accountStatusesResponse.BlockID.String(), Height: strconv.FormatUint(accountStatusesResponse.Height, 10), AccountEvents: accountStatusesResponse.AccountEvents, @@ -112,8 +126,8 @@ func (p *AccountStatusesDataProvider) handleResponse(send chan<- interface{}) fu } } -// ParseAccountStatusesArguments validates and initializes the account statuses arguments. -func ParseAccountStatusesArguments( +// parseAccountStatusesArguments validates and initializes the account statuses arguments. +func parseAccountStatusesArguments( arguments models.Arguments, chain flow.Chain, eventFilterConfig state_stream.EventFilterConfig, @@ -121,8 +135,8 @@ func ParseAccountStatusesArguments( var args AccountStatusesArguments // Check for mutual exclusivity of start_block_id and start_block_height early - _, hasStartBlockID := arguments["start_block_id"] - _, hasStartBlockHeight := arguments["start_block_height"] + startBlockIDIn, hasStartBlockID := arguments["start_block_id"] + startBlockHeightIn, hasStartBlockHeight := arguments["start_block_height"] if hasStartBlockID && hasStartBlockHeight { return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'") @@ -130,29 +144,43 @@ func ParseAccountStatusesArguments( // Parse 'start_block_id' if provided if hasStartBlockID { + result, ok := startBlockIDIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_id' must be a string") + } var startBlockID parser.ID - err := startBlockID.Parse(arguments["start_block_id"]) + err := startBlockID.Parse(result) if err != nil { return args, fmt.Errorf("invalid 'start_block_id': %w", err) } args.StartBlockID = startBlockID.Flow() } + // Parse 'start_block_height' if provided // Parse 'start_block_height' if provided if hasStartBlockHeight { - var err error - args.StartBlockHeight, err = util.ToUint64(arguments["start_block_height"]) + result, ok := startBlockHeightIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_height' must be a string") + } + startBlockHeight, err := util.ToUint64(result) if err != nil { return args, fmt.Errorf("invalid 'start_block_height': %w", err) } + args.StartBlockHeight = startBlockHeight } else { args.StartBlockHeight = request.EmptyHeight } - // Parse 'event_types' as []string{} + // Parse 'event_types' as a JSON array var eventTypes parser.EventTypes if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { - err := eventTypes.Parse(strings.Split(eventTypesIn, ",")) + result, ok := eventTypesIn.([]string) + if !ok { + return args, fmt.Errorf("'event_types' must be an array of string") + } + + err := eventTypes.Parse(result) if err != nil { return args, fmt.Errorf("invalid 'event_types': %w", err) } @@ -160,8 +188,11 @@ func ParseAccountStatusesArguments( // Parse 'accountAddresses' as []string{} var accountAddresses []string - if addressesIn, ok := arguments["accountAddresses"]; ok && addressesIn != "" { - accountAddresses = strings.Split(addressesIn, ",") + if accountAddressesIn, ok := arguments["account_addresses"]; ok && accountAddressesIn != "" { + accountAddresses, ok = accountAddressesIn.([]string) + if !ok { + return args, fmt.Errorf("'account_addresses' must be an array of string") + } } // Initialize the event filter with the parsed arguments diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 9431ca9a4dc..6b62f45ffac 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -80,6 +80,9 @@ func (p *EventsDataProvider) Run() error { return subscription.HandleSubscription(p.subscription, p.handleResponse()) } +// handleResponse processes events and sends the formatted response. +// +// No errors are expected during normal operations. func (p *EventsDataProvider) handleResponse() func(eventsResponse *backend.EventsResponse) error { blocksSinceLastMessage := uint64(0) messageIndex := counters.NewMonotonousCounter(1) @@ -189,7 +192,7 @@ func parseEventsArguments( if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" { addresses, ok = addressesIn.([]string) if !ok { - return args, fmt.Errorf("'addresses' must be a string") + return args, fmt.Errorf("'addresses' must be an array of string") } } @@ -198,7 +201,7 @@ func parseEventsArguments( if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" { contracts, ok = contractsIn.([]string) if !ok { - return args, fmt.Errorf("'contracts' must be a string") + return args, fmt.Errorf("'contracts' must be an array of string") } } From 50fd2897631007e004ccde193fca7b5d7bd64ee1 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 9 Dec 2024 13:29:38 +0200 Subject: [PATCH 29/30] Fixed error msg --- .../access/rest/websockets/data_providers/events_provider.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 9431ca9a4dc..b3cbb41d635 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -189,7 +189,7 @@ func parseEventsArguments( if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" { addresses, ok = addressesIn.([]string) if !ok { - return args, fmt.Errorf("'addresses' must be a string") + return args, fmt.Errorf("'addresses' must be an array of string") } } @@ -198,7 +198,7 @@ func parseEventsArguments( if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" { contracts, ok = contractsIn.([]string) if !ok { - return args, fmt.Errorf("'contracts' must be a string") + return args, fmt.Errorf("'contracts' must be an array of string") } } From 37836f9c6b323d0c0ceec2ea090b5b4a1cdf095f Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 9 Dec 2024 18:18:40 +0200 Subject: [PATCH 30/30] Added happy path cases, fixed remarks from event provider PR --- .../account_statuses_provider.go | 7 +- .../account_statuses_provider_test.go | 181 +++++++++++++++--- .../data_providers/events_provider_test.go | 32 ++-- .../rest/websockets/data_providers/factory.go | 2 +- 4 files changed, 171 insertions(+), 51 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 8261e77a1ef..1a3aee203c9 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -3,10 +3,11 @@ package data_providers import ( "context" "fmt" + "strconv" + "github.com/rs/zerolog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "strconv" "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/http/request" @@ -55,7 +56,7 @@ func NewAccountStatusesDataProvider( } // Initialize arguments passed to the provider. - accountStatusesArgs, err := ParseAccountStatusesArguments(arguments, chain, eventFilterConfig) + accountStatusesArgs, err := parseAccountStatusesArguments(arguments, chain, eventFilterConfig) if err != nil { return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err) } @@ -66,7 +67,7 @@ func NewAccountStatusesDataProvider( topic, cancel, send, - p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to events based on arguments. + p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments. ) return p, nil diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go index 5a15949bfad..aeadd68f649 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go @@ -2,18 +2,19 @@ package data_providers import ( "context" - "fmt" "strconv" "testing" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/state_stream/backend" ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" ) @@ -28,6 +29,8 @@ type AccountStatusesProviderSuite struct { chain flow.Chain rootBlock flow.Block finalizedBlock *flow.Header + + factory *DataProviderFactoryImpl } func TestNewAccountStatusesDataProvider(t *testing.T) { @@ -42,43 +45,155 @@ func (s *AccountStatusesProviderSuite) SetupTest() { s.rootBlock = unittest.BlockFixture() s.rootBlock.Header.Height = 0 + + s.factory = NewDataProviderFactory( + s.log, + s.api, + nil, + flow.Testnet.Chain(), + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) + s.Require().NotNil(s.factory) +} + +// TestAccountStatusesDataProvider_HappyPath tests the behavior of the account statuses data provider +// when it is configured correctly and operating under normal conditions. It +// validates that events are correctly streamed to the channel and ensures +// no unexpected errors occur. +func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_HappyPath() { + s.testHappyPath( + AccountStatusesTopic, + s.subscribeAccountStatusesDataProviderTestCases(), + s.requireAccountStatuses, + ) +} + +func (s *AccountStatusesProviderSuite) testHappyPath( + topic string, + tests []testType, + requireFn func(interface{}, *backend.AccountStatusesResponse), +) { + expectedEvents := []flow.Event{ + unittest.EventFixture(state_stream.CoreEventAccountCreated, 0, 0, unittest.IdentifierFixture(), 0), + unittest.EventFixture(state_stream.CoreEventAccountKeyAdded, 0, 0, unittest.IdentifierFixture(), 0), + } + + var expectedAccountStatusesResponses []backend.AccountStatusesResponse + + for i := 0; i < len(expectedEvents); i++ { + expectedAccountStatusesResponses = append(expectedAccountStatusesResponses, backend.AccountStatusesResponse{ + Height: s.rootBlock.Header.Height, + BlockID: s.rootBlock.ID(), + AccountEvents: map[string]flow.EventsList{ + unittest.RandomAddressFixture().String(): expectedEvents, + }, + }) + } + + for _, test := range tests { + s.Run(test.name, func() { + ctx := context.Background() + send := make(chan interface{}, 10) + + // Create a channel to simulate the subscription's data channel + accStatusesChan := make(chan interface{}) + + // // Create a mock subscription and mock the channel + sub := ssmock.NewSubscription(s.T()) + sub.On("Channel").Return((<-chan interface{})(accStatusesChan)) + sub.On("Err").Return(nil) + test.setupBackend(sub) + + // Create the data provider instance + provider, err := s.factory.NewDataProvider(ctx, topic, test.arguments, send) + s.Require().NotNil(provider) + s.Require().NoError(err) + + // Run the provider in a separate goroutine + go func() { + err = provider.Run() + s.Require().NoError(err) + }() + + // Simulate emitting data to the events channel + go func() { + defer close(accStatusesChan) + + for i := 0; i < len(expectedAccountStatusesResponses); i++ { + accStatusesChan <- &expectedAccountStatusesResponses[i] + } + }() + + // Collect responses + for _, e := range expectedAccountStatusesResponses { + v, ok := <-send + s.Require().True(ok, "channel closed while waiting for event %v: err: %v", e.BlockID, sub.Err()) + + requireFn(v, &e) + } + + // Ensure the provider is properly closed after the test + provider.Close() + }) + } } -// invalidArgumentsTestCases returns a list of test cases with invalid argument combinations -// for testing the behavior of account statuses data providers. Each test case includes a name, -// a set of input arguments, and the expected error message that should be returned. -// -// The test cases cover scenarios such as: -// 1. Supplying both 'start_block_id' and 'start_block_height' simultaneously, which is not allowed. -// 2. Providing invalid 'start_block_id' value. -// 3. Providing invalid 'start_block_height' value. -func (s *AccountStatusesProviderSuite) invalidArgumentsTestCases() []testErrType { - return []testErrType{ +func (s *AccountStatusesProviderSuite) subscribeAccountStatusesDataProviderTestCases() []testType { + return []testType{ { - name: "provide both 'start_block_id' and 'start_block_height' arguments", + name: "SubscribeAccountStatusesFromStartBlockID happy path", arguments: models.Arguments{ - "start_block_id": s.rootBlock.ID().String(), - "start_block_height": fmt.Sprintf("%d", s.rootBlock.Header.Height), + "start_block_id": s.rootBlock.ID().String(), + "event_types": []string{"flow.AccountCreated", "flow.AccountKeyAdded"}, + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeAccountStatusesFromStartBlockID", + mock.Anything, + s.rootBlock.ID(), + mock.Anything, + ).Return(sub).Once() }, - expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'", }, { - name: "invalid 'start_block_id' argument", - arguments: map[string]string{ - "start_block_id": "invalid_block_id", + name: "SubscribeAccountStatusesFromStartHeight happy path", + arguments: models.Arguments{ + "start_block_height": strconv.FormatUint(s.rootBlock.Header.Height, 10), + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeAccountStatusesFromStartHeight", + mock.Anything, + s.rootBlock.Header.Height, + mock.Anything, + ).Return(sub).Once() }, - expectedErrorMsg: "invalid ID format", }, { - name: "invalid 'start_block_height' argument", - arguments: map[string]string{ - "start_block_height": "-1", + name: "SubscribeAccountStatusesFromLatestBlock happy path", + arguments: models.Arguments{}, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeAccountStatusesFromLatestBlock", + mock.Anything, + mock.Anything, + ).Return(sub).Once() }, - expectedErrorMsg: "value must be an unsigned 64 bit integer", }, } } +// requireAccountStatuses ensures that the received account statuses information matches the expected data. +func (s *AccountStatusesProviderSuite) requireAccountStatuses( + v interface{}, + expectedAccountStatusesResponse *backend.AccountStatusesResponse, +) { + _, ok := v.(*models.AccountStatusesResponse) + require.True(s.T(), ok, "Expected *models.AccountStatusesResponse, got %T", v) + + //s.Require().ElementsMatch(expectedAccountStatusesResponse.AccountEvents, actualResponse.AccountEvents) +} + // TestAccountStatusesDataProvider_InvalidArguments tests the behavior of the account statuses data provider // when invalid arguments are provided. It verifies that appropriate errors are returned // for missing or conflicting arguments. @@ -92,17 +207,19 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidAr topic := AccountStatusesTopic - for _, test := range s.invalidArgumentsTestCases() { + for _, test := range invalidArgumentsTestCases() { s.Run(test.name, func() { provider, err := NewAccountStatusesDataProvider( ctx, s.log, s.api, - s.chain, - state_stream.DefaultEventFilterConfig, topic, test.arguments, - send) + send, + s.chain, + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval, + ) s.Require().Nil(provider) s.Require().Error(err) s.Require().Contains(err.Error(), test.expectedErrorMsg) @@ -128,7 +245,7 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe s.api.On("SubscribeAccountStatusesFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub) arguments := - map[string]string{ + map[string]interface{}{ "start_block_id": s.rootBlock.ID().String(), } @@ -137,11 +254,13 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe ctx, s.log, s.api, - s.chain, - state_stream.DefaultEventFilterConfig, topic, arguments, - send) + send, + s.chain, + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval, + ) s.Require().NotNil(provider) s.Require().NoError(err) diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index c1728eef4a9..336744d24c7 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -57,6 +57,18 @@ func (s *EventsProviderSuite) SetupTest() { s.Require().NotNil(s.factory) } +// TestEventsDataProvider_HappyPath tests the behavior of the events data provider +// when it is configured correctly and operating under normal conditions. It +// validates that events are correctly streamed to the channel and ensures +// no unexpected errors occur. +func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() { + s.testHappyPath( + EventsTopic, + s.subscribeEventsDataProviderTestCases(), + s.requireEvents, + ) +} + // subscribeEventsDataProviderTestCases generates test cases for events data providers. func (s *EventsProviderSuite) subscribeEventsDataProviderTestCases() []testType { return []testType{ @@ -103,18 +115,6 @@ func (s *EventsProviderSuite) subscribeEventsDataProviderTestCases() []testType } } -// TestEventsDataProvider_HappyPath tests the behavior of the events data provider -// when it is configured correctly and operating under normal conditions. It -// validates that events are correctly streamed to the channel and ensures -// no unexpected errors occur. -func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() { - s.testHappyPath( - EventsTopic, - s.subscribeEventsDataProviderTestCases(), - s.requireEvents, - ) -} - // testHappyPath tests a variety of scenarios for data providers in // happy path scenarios. This function runs parameterized test cases that // simulate various configurations and verifies that the data provider operates @@ -212,13 +212,13 @@ func (s *EventsProviderSuite) requireEvents(v interface{}, expectedEventsRespons // 1. Supplying both 'start_block_id' and 'start_block_height' simultaneously, which is not allowed. // 2. Providing invalid 'start_block_id' value. // 3. Providing invalid 'start_block_height' value. -func (s *EventsProviderSuite) invalidArgumentsTestCases() []testErrType { +func invalidArgumentsTestCases() []testErrType { return []testErrType{ { name: "provide both 'start_block_id' and 'start_block_height' arguments", arguments: models.Arguments{ - "start_block_id": s.rootBlock.ID().String(), - "start_block_height": fmt.Sprintf("%d", s.rootBlock.Header.Height), + "start_block_id": unittest.BlockFixture().ID().String(), + "start_block_height": fmt.Sprintf("%d", unittest.BlockFixture().Header.Height), }, expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'", }, @@ -252,7 +252,7 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { topic := EventsTopic - for _, test := range s.invalidArgumentsTestCases() { + for _, test := range invalidArgumentsTestCases() { s.Run(test.name, func() { provider, err := NewEventsDataProvider( ctx, diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 4c014d3a3b5..26aade4e090 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -101,7 +101,7 @@ func (s *DataProviderFactoryImpl) NewDataProvider( case EventsTopic: return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) case AccountStatusesTopic: - return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, s.chain, s.eventFilterConfig, topic, arguments, ch) + return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) case TransactionStatusesTopic: // TODO: Implemented handlers for each topic should be added in respective case return nil, fmt.Errorf(`topic "%s" not implemented yet`, topic)